Commit 1cbf520d authored by IKEDA Soji's avatar IKEDA Soji
Browse files

Refactoring task_manager.pl: Introducing Sympa::Spool::Task & Sympa::Spindle::ProcessTask

parent 12522de1
......@@ -137,6 +137,7 @@ nobase_modules_DATA = \
Sympa/Spindle/ProcessModeration.pm \
Sympa/Spindle/ProcessOutgoing.pm \
Sympa/Spindle/ProcessRequest.pm \
Sympa/Spindle/ProcessTask.pm \
Sympa/Spindle/ProcessTemplate.pm \
Sympa/Spindle/ResendArchive.pm \
Sympa/Spindle/ToAlarm.pm \
......@@ -163,6 +164,7 @@ nobase_modules_DATA = \
Sympa/Spool/Held.pm \
Sympa/Spool/Incoming.pm \
Sympa/Spool/Moderation.pm \
Sympa/Spool/Task.pm \
Sympa/Task.pm \
Sympa/Template.pm \
Sympa/Ticket.pm \
......
This diff is collapsed.
# -*- indent-tabs-mode: nil; -*-
# vim:ft=perl:et:sw=4
# $Id$
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 2018 The Sympa Community. See the AUTHORS.md file at the
# top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
package Sympa::Spool::Task;
use strict;
use warnings;
use Conf;
use Sympa::List;
use base qw(Sympa::Spool);
sub _directories {
return {directory => $Conf::Conf{'queuetask'},};
}
use constant _generator => 'Sympa::Task';
use constant _marshal_format => '%ld.%s.%s.%s@%s';
use constant _marshal_keys => [qw(date label model localpart domainpart)];
use constant _marshal_regexp =>
qr{\A(\d+)[.](\w*)[.](\w+)[.](?:([^\s\@]*)\@([\w\.\-*]*)|_global)\z};
sub _filter {
my $self = shift;
my $metadata = shift;
return undef unless $metadata and defined $metadata->{date};
return 0 if time < $metadata->{date};
return 1;
}
sub _load {
my $self = shift;
unless ($self->{_glob_pattern}) {
$self->_create_all_tasks();
}
$self->SUPER::_load();
}
sub quarantine {
my $self = shift;
my $handle = shift;
$self->remove($handle);
}
# Private function to create all necessary tasks.
sub _create_all_tasks {
my $self = shift;
my $current_date = time;
my $existing_tasks = $self->_existing_tasks;
# Create global tasks.
foreach my $model (keys %{Sympa::Task::site_models()}) {
next if ${$existing_tasks->{'*'} || {}}{$model};
my $task = $self->_generator->new(
context => '*',
date => $current_date,
model => $model
);
next unless $task;
$self->store($task);
}
# Create list tasks.
foreach my $robot (Sympa::List::get_robots()) {
my $all_lists = Sympa::List::get_lists($robot);
foreach my $list (@{$all_lists || []}) {
foreach my $model (keys %{Sympa::Task::list_models()}) {
next if ${$existing_tasks->{$list->get_id} || {}}{$model};
next unless $list->{'admin'}{'status'} eq 'open';
if ($model eq 'sync_include') {
next unless $list->has_include_data_sources;
}
my $task = $self->_generator->new(
context => $list,
date => $current_date,
model => $model
);
next unless $task;
$self->store($task);
}
}
}
}
# Private function to list all existing tasks.
# Old name: Sympa::Task::list_tasks().
sub _existing_tasks {
my $self = shift;
my $existing_tasks = {};
# Get all entries.
$self->{_metadatas} = $self->SUPER::_load()
or return {};
while (1) {
my ($task, $handle) = $self->next(no_filter => 1, no_lock => 1);
if ($task and $handle) {
my $id =
(ref $task->{context} eq 'Sympa::List')
? $task->{context}->get_id
: '*';
my $model = $task->{model};
$existing_tasks->{$id}{$model} = 1;
} elsif ($handle) {
next;
} else {
last;
}
}
return $existing_tasks;
}
1;
__END__
=encoding utf-8
=head1 NAME
Sympa::Spool::Task - Spool for tasks
=head1 SYNOPSIS
use Sympa::Spool::Task;
my $spool = Sympa::Spool::Task->new;
$spool->store($task);
my ($task, $handle) = $spool->next;
=head1 DESCRIPTION
L<Sympa::Spool::Task> implements the spool for tasks.
=head2 Methods
See also L<Sympa::Spool/"Public methods">.
=over
=item next ( [ no_filter =E<gt> 1 ], [ no_lock =E<gt> 1 ] )
Order is controlled by date element of file name.
if C<no_filter> is I<not> set,
messages with date newer than current time are skipped.
All necessary tasks are created and stored into spool in advance.
=item quarantine ( $handle )
Removes a task: The same as remove().
This spool does not have C<bad/> subdirectory.
=back
=head2 Context and metadata
See also L<Sympa::Spool/"Marshaling and unmarshaling metadata">.
This class particularly gives following metadata:
=over
=item {date}
Unix time when task will be executed at the next time.
=item {label}
=item {model}
TBD.
=back
=head1 CONFIGURATION PARAMETERS
Following site configuration parameters in sympa.conf will be referred.
=over
=item queuetask
Directory path of task spool.
=back
=head1 SEE ALSO
L<task_manager(8)>, L<Sympa::Spool>, L<Sympa::Task>.
=head1 HISTORY
L<Sympa::Spool::Task> appeared on Sympa 6.2.XX.
=cut
......@@ -8,6 +8,9 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2018 The Sympa Community. See the AUTHORS.md file at the
# top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -26,153 +29,494 @@ package Sympa::Task;
use strict;
use warnings;
use Scalar::Util;
use Template;
use Sympa::List;
use Sympa;
use Conf;
use Sympa::Log;
use Sympa::Regexps;
use Sympa::Tools::Data;
my $log = Sympa::Log->instance;
my @task_list;
my %task_by_list;
my %task_by_model;
## Creates a new Sympa::Task object
# List of list task models. FIXME:Refer Sympa::ListDef.
use constant list_models => {
#expire => 'expire_task', # Not yet implemented.
remind => 'remind_task',
sync_include => '',
};
# List of global task models. FIXME:Refer Sympa::ConfDef.
use constant site_models => {
expire_bounce => 'expire_bounce_task',
purge_user_table => 'purge_user_table_task',
purge_logs_table => 'purge_logs_table_task',
purge_session_table => 'purge_session_table_task',
purge_spools => 'purge_spools_task',
purge_tables => 'purge_tables_task',
purge_one_time_ticket_table => 'purge_one_time_ticket_table_task',
purge_orphan_bounces => 'purge_orphan_bounces_task',
eval_bouncers => 'eval_bouncers_task',
process_bouncers => 'process_bouncers_task',
};
# Creates a new Sympa::Task object.
# Old name: create() in task_manager.pl, entirely rewritten.
sub new {
my ($pkg, $file) = @_;
my $task;
$log->syslog('debug2', '(%s)', $file);
$task->{'filepath'} = $file;
## We might get a filepath
## Extract filename from path
my @path = split /\//, $file;
$task->{'filename'} = $path[$#path];
my $listname_regexp = Sympa::Regexps::listname();
my $host_regexp = Sympa::Regexps::host();
## File including the list domain
if ($task->{'filename'} =~
/^(\d+)\.(\w*)\.(\w+)\.($listname_regexp|_global)\@($host_regexp)$/) {
$task->{'date'} = $1;
$task->{'label'} = $2;
$task->{'model'} = $3;
$task->{'object'} = $4;
$task->{'domain'} = $5;
if ($task->{'object'} ne '_global') { # list task
$task->{'list_object'} =
Sympa::List->new($task->{'object'}, $task->{'domain'});
$task->{'domain'} = $task->{'list_object'}{'domain'};
my $class = shift;
# Optional serialized content.
my $serialized;
if (@_ and ($_[0] eq '' or $_[0] =~ /\n/)) {
$serialized = shift;
}
my %options = @_;
die 'bug in logic. Ask developer'
unless defined $options{model} and length $options{model};
$options{context} = '*'
unless ref $options{context} eq 'Sympa::List'; #FIXME
$options{date} = time
unless defined $options{date};
$options{label} = ($options{model} eq 'sync_include') ? 'INIT' : ''
unless defined $options{label};
my $self = bless {%options} => $class;
unless (defined $serialized) {
my $that = $self->{context};
my $model = $self->{model};
my $name;
my $pname;
if (ref $that eq 'Sympa::List' and $model eq 'sync_include') {
$name = 'ttl';
} elsif (ref $that eq 'Sympa::List'
and $pname = ${list_models()}{$model}) {
$name = $that->{'admin'}{$pname}->{'name'};
} elsif ($that eq '*' and $pname = ${site_models()}{$model}) {
$name = Conf::get_robot_conf($that, $pname);
} else {
$log->syslog('err', 'Unknown task %s for %s', $model, $that);
return undef;
}
unless ($name) {
$log->syslog('debug3', 'Inactive task %s for %s', $model, $that);
return undef;
}
} elsif ($task->{'filename'} =~
/^(\d+)\.(\w*)\.(\w+)\.($listname_regexp|_global)$/) {
$task->{'date'} = $1;
$task->{'label'} = $2;
$task->{'model'} = $3;
$task->{'object'} = $4;
my $model_name = sprintf '%s.%s.task', $model, $name;
my $model_file = Sympa::search_fullpath(
$that,
$model_name,
subdir => (
(ref $that eq 'Sympa::List')
? 'list_task_models'
: 'global_task_models'
)
);
unless ($model_file) {
$log->syslog('err', 'Unable to find task file %s for %s',
$model_name, $that);
return undef;
}
if ($task->{'object'} ne '_global') { # list task
$task->{'list_object'} = Sympa::List->new($task->{'object'});
$task->{'domain'} = $task->{'list_object'}{'domain'};
# creation
my $data = {
creation_date => $self->{date}, # Compat., has never used
execution_date => 'execution_date', # Compat.
};
if (ref $that eq 'Sympa::List') {
$data->{domain} = $that->{'domain'}; # New on 6.2.35b
$data->{list} = {
name => $that->{'name'},
robot => $that->{'domain'}, # Compat., has never used
ttl => $that->{'admin'}{'ttl'},
};
}
} else {
$log->syslog('err', 'Unknown format for task "%s"',
$task->{'filename'});
my $tt2 = Template->new(
{ 'START_TAG' => quotemeta('['),
'END_TAG' => quotemeta(']'),
'ABSOLUTE' => 1
}
);
unless ($tt2 and $tt2->process($model_file, $data, \$serialized)) {
$log->syslog('err', 'Failed to parse task template "%s": %s',
$model_file, $tt2->error);
return undef;
}
}
unless ($self->_parse($serialized)) {
$log->syslog('err', 'Syntax error in task file. You should check %s',
$self);
return undef;
}
$self;
}
$task->{'id'} = $task->{'list_object'}{'name'};
$task->{'id'} .= '@' . $task->{'domain'} if (defined $task->{'domain'});
### DEFINITION OF AVAILABLE COMMANDS FOR TASKS ###
my $date_arg_regexp1 = '\d+|execution_date';
my $date_arg_regexp2 = '(\d\d\d\dy)(\d+m)?(\d+d)?(\d+h)?(\d+min)?(\d+sec)?';
my $date_arg_regexp3 =
'(\d+|execution_date)(\+|\-)(\d+y)?(\d+m)?(\d+w)?(\d+d)?(\d+h)?(\d+min)?(\d+sec)?';
my $delay_regexp = '(\d+y)?(\d+m)?(\d+w)?(\d+d)?(\d+h)?(\d+min)?(\d+sec)?';
my $var_regexp = '@\w+';
my $subarg_regexp = '(\w+)(|\((.*)\))';
# commands which use a variable. If you add such a command, the first
# parameter must be the variable
my %var_commands = (
'delete_subs' => ['var'],
# variable
'send_msg' => ['var', '\w+'],
#variable template
'rm_file' => ['var'],
# variable
);
# commands which are used for assignments
my %asgn_commands = (
'select_subs' => ['subarg'],
# condition
'delete_subs' => ['var'],
# variable
);
# regular commands
my %commands = (
'next' => ['date', '\w*'],
# date label
'stop' => [],
'create' => ['subarg', '\w+', '\w+'],
#object model model choice
'exec' => ['.+'],
#file #delay
'expire_bounce' => ['\d+'],
#template date
'sync_include' => [],
'purge_user_table' => [],
'purge_logs_table' => [],
'purge_session_table' => [],
'purge_spools' => [],
'purge_tables' => [],
'purge_one_time_ticket_table' => [],
'purge_orphan_bounces' => [],
'eval_bouncers' => [],
'process_bouncers' => [],
%var_commands,
%asgn_commands,
);
### SYNTAX CHECKING SUBROUTINES ###
# Check the syntax of a task.
# Old name: check() in task_manager.pl.
sub _parse {
$log->syslog('debug2', '(%s, ...)', @_);
my $self = shift;
my $serialized = shift;
my $lnb = 0; # line number
my %used_labels; # list of labels used as parameter in commands
my %labels; # list of declared labels
my %used_vars; # list of vars used as parameter in commands
my %vars; # list of declared vars
return undef unless defined $serialized;
$self->{_source} = $serialized;
$self->{_parsed} = [];
foreach my $line (split /\r\n|\r|\n/, $serialized) {
$lnb++;
next if $line =~ /^\s*\#/;
my %result;
unless (_chk_line($line, \%result)) {
$log->syslog('err', 'Error at line %s: %s', $lnb, $line);
$log->syslog('err', '%s', $result{'error'});
return undef;
}
## Bless Sympa::Task object
bless $task, $pkg;
if ($result{'nature'} eq 'assignment') {
if (_chk_cmd(
$result{'command'}, $lnb,
$result{'Rarguments'}, \%used_labels,
\%used_vars
)
) {
$vars{$result{'var'}} = 1;
} else {
return undef;
}
}
if ($result{'nature'} eq 'command') {
return undef
unless _chk_cmd($result{'command'}, $lnb,
$result{'Rarguments'}, \%used_labels, \%used_vars);
}
return $task;
$labels{$result{'label'}} = 1 if $result{'nature'} eq 'label';
push @{$self->{_parsed}}, {%result, line => $lnb};
}
# are all labels used ?
foreach my $label (keys %labels) {
$log->syslog('debug3', 'Warning: Label %s exists but is not used',
$label)
unless $used_labels{$label};
}
# do all used labels exist ?
foreach my $label (keys %used_labels) {
unless ($labels{$label}) {
$log->syslog('err', 'Label %s is used but does not exist',
$label);
return undef;
}
}
# are all variables used ?
foreach my $var (keys %vars) {
$log->syslog('notice', 'Warning: Var %s exists but is not used', $var)
unless $used_vars{$var};
}
# do all used variables exist ?
foreach my $var (keys %used_vars) {
unless ($vars{$var}) {
$log->syslog('err', 'Var %s is used but does not exist', $var);
return undef;
}
}
return 1;
}
## Build all Sympa::Task objects
sub list_tasks {
my $spool_task = shift;
my $filter = shift;
# Check a task line.
# Old name: chk_line() in task_manager.pl.
sub _chk_line {
my $line = shift;
my $Rhash = shift;
## Create required tasks
unless (opendir(DIR, $spool_task)) {
$log->syslog('err', 'Can\'t open dir %s: %m', $spool_task);
## just in case...
chomp $line;
$log->syslog('debug2', '(%s, %s)', $line, $Rhash->{'nature'});
$Rhash->{'nature'} = undef;
# empty line
unless (length $line) {
$Rhash->{'nature'} = 'empty line';
return 1;
}
my @task_files =
sort epoch_sort (grep !/^\.\.?$/, readdir DIR); # @tasks updating
closedir DIR;
$log->syslog('debug', "Listing all tasks");
## Reset the list of tasks
undef @task_list;
undef %task_by_list;
undef %task_by_model;
# comment
if ($line =~ /^\s*\#.*/) {
$Rhash->{'nature'} = 'comment';
return 1;
}
## Create Sympa::Task objects
foreach my $t (@task_files) {
next if ($t =~ /^\./);