Commit c323685c authored by sikeda's avatar sikeda
Browse files

[-dev] (con'd r11556) Now Sympa::Bulk would be instantiated.

git-svn-id: https://subversion.renater.fr/sympa/branches/sympa-6.2-branch@11571 05aa8bb8-cd2b-0410-b1d7-8918dfa770ce
parent 1601bb1f
......@@ -86,6 +86,8 @@ process();
exit 0;
sub process {
my $bulk = Sympa::Bulk->new;
my $sth = SDM::do_prepared_query(
q{SELECT *
FROM bulkmailer_table
......@@ -169,7 +171,7 @@ sub process {
my $marshalled;
if ($options{dry_run}) {
$marshalled =
Sympa::Bulk::store($message, $rcpt, tag => $message->{tag});
$bulk->store($message, $rcpt, tag => $message->{tag});
} else {
$marshalled = tools::marshal_metadata(
$message,
......
......@@ -66,6 +66,8 @@ unless (($GID == (getgrnam(Sympa::Constants::GROUP))[2])
# Sets the UMASK
umask(oct($Conf::Conf{'umask'}));
my $bulk = Sympa::Bulk->new;
my $spool_dir = $Conf::Conf{'queue'};
my ($dh, $filename);
......@@ -126,7 +128,7 @@ sub process {
unless $options{dry_run};
Log::do_log('info', '%s: Moved to msg spool', $message);
} else {
Sympa::Bulk::store($message, [split /\s*,\s*/, $message->{rcpt}])
$bulk->store($message, [split /\s*,\s*/, $message->{rcpt}])
unless $options{dry_run};
Log::do_log('info', '%s: Moved to bulk spool', $message);
}
......
......@@ -973,6 +973,8 @@ my $query;
 
my $birthday = time;
 
my $bulk = Sympa::Bulk->new;
Log::do_log('info', 'WWSympa started, process %d', $PID);
 
# Now internal encoding is same as input/output.
......@@ -1049,9 +1051,6 @@ while ($query = new_loop()) {
Sympa::Crash::register_handler();
 
## Get params in a hash
# foreach ($query->param) {
# $in{$_} = $query->param($_);
# }
%in = $query->Vars;
 
foreach my $k (keys %::changed_params) {
......@@ -10438,8 +10437,7 @@ sub do_send_me {
# Overwrite original envelope sender. It is REQUIRED for delivery.
$message->{envelope_sender} = $list->get_list_address('return_path');
 
unless (
defined Sympa::Bulk::store($message, $param->{'user'}{'email'})) {
unless (defined $bulk->store($message, $param->{'user'}{'email'})) {
$param->{'status'} = 'message_err';
wwslog(
'err',
......@@ -22165,7 +22163,7 @@ sub do_send_mail {
$u_message->{priority} =
Conf::get_robot_conf($robot, 'sympa_priority');
 
unless (defined Sympa::Bulk::store($u_message, [@to_user])) {
unless (defined $bulk->store($u_message, [@to_user])) {
Sympa::Report::reject_report_web(
'intern',
'cannot_send_mail',
......
......@@ -36,8 +36,6 @@ use tools;
use base qw(Class::Singleton);
my $mailer = Sympa::Mailer->instance;
# Constructor for Class::Singleton.
sub _new_instance {
my $class = shift;
......@@ -54,7 +52,8 @@ sub store {
my $rcpt = shift;
my %options = @_;
my $use_bulk = $self->{use_bulk};
my $mailer =
$self->{use_bulk} ? Sympa::Bulk->new : Sympa::Mailer->instance;
my $operation = $options{operation};
my $robot_id;
......@@ -92,11 +91,7 @@ sub store {
$message->{priority} =
Conf::get_robot_conf($robot_id, 'sympa_priority');
if ($use_bulk) {
return Sympa::Bulk::store($message, $rcpt);
} else {
return $mailer->store($message, $rcpt);
}
return $mailer->store($message, $rcpt);
}
}
......@@ -104,8 +99,9 @@ sub flush {
my $self = shift;
my %options = @_;
my $use_bulk = $self->{use_bulk};
my $purge = $options{purge};
my $mailer =
$self->{use_bulk} ? Sympa::Bulk->new : Sympa::Mailer->instance;
my $purge = $options{purge};
foreach my $robot_id (keys %{$self->{stack}}) {
foreach my $operation (keys %{$self->{stack}->{$robot_id}}) {
......@@ -159,13 +155,7 @@ sub flush {
) unless $operation eq 'logs_failed';
return undef;
}
my $status;
if ($use_bulk) {
$status = Sympa::Bulk::store($message, $rcpt);
} else {
$status = $mailer->store($message, $rcpt);
}
unless (defined $status) {
unless (defined $mailer->store($message, $rcpt)) {
Log::do_log(
'notice',
'Unable to send template "listmaster_groupnotification" to %s listmaster %s',
......
......@@ -37,43 +37,54 @@ use Sympa::Message;
use tools;
# Cache of spool.
our $metadatas;
sub new {
my $class = shift;
bless {
msg_directory => $Conf::Conf{'queuebulk'} . '/msg',
pct_directory => $Conf::Conf{'queuebulk'} . '/pct',
bad_directory => $Conf::Conf{'queuebulk'} . '/bad',
metadatas => undef,
} => $class;
}
sub next {
my $msg_spool_dir = $Conf::Conf{'queuebulk'} . '/msg';
my $pct_spool_dir = $Conf::Conf{'queuebulk'} . '/pct';
my $self = shift;
unless ($metadatas) {
unless ($self->{metadatas}) {
my $cwd = Cwd::getcwd();
unless (chdir $pct_spool_dir) {
die sprintf 'Cannot chdir to %s: %s', $pct_spool_dir, $ERRNO;
unless (chdir $self->{pct_directory}) {
die sprintf 'Cannot chdir to %s: %s', $self->{pct_directory},
$ERRNO;
}
$metadatas = [
$self->{metadatas} = [
sort grep {
!/,lock/
and !m{(?:\A|/)(?:\.|T\.|BAD-)}
and -f ($pct_spool_dir . '/' . $_)
and -f ($self->{pct_directory} . '/' . $_)
} glob '*/*'
];
chdir $cwd;
}
unless (@{$metadatas}) {
undef $metadatas;
unless (@{$self->{metadatas}}) {
undef $self->{metadatas};
return;
}
my ($lock_fh, $metadata, $message);
while (my $marshalled = shift @{$metadatas}) {
while (my $marshalled = shift @{$self->{metadatas}}) {
# Try locking packet. Those locked or removed by other process will
# be skipped.
$lock_fh = Sympa::LockedFile->new($pct_spool_dir . '/' . $marshalled,
$lock_fh =
Sympa::LockedFile->new($self->{pct_directory} . '/' . $marshalled,
-1, '+<');
next unless $lock_fh;
# FIXME: The list or the robot that injected packet can no longer be
# available.
$metadata = tools::unmarshal_metadata(
$pct_spool_dir,
$self->{pct_directory},
$marshalled,
qr{\A(\w+)\.(\w+)\.(\d+)\.(\d+\.\d+)\.([^\s\@]*)\@([\w\.\-*]*)_(\w+),(\d+),(\d+)/(\w+)\z},
[ qw(priority packet_priority date time localpart domainpart tag pid rand serial)
......@@ -90,7 +101,8 @@ sub next {
[ qw(priority packet_priority date time localpart domainpart tag pid rand)
]
);
$message = Sympa::Message->new_from_file($msg_spool_dir . '/' . $msg_file, %$metadata);
$message = Sympa::Message->new_from_file(
$self->{msg_directory} . '/' . $msg_file, %$metadata);
if ($message) {
my $rcpt_string = do { local $RS; <$lock_fh> };
$message->{rcpt} = [split /\n+/, $rcpt_string];
......@@ -104,10 +116,10 @@ sub next {
}
sub quarantine {
my $self = shift;
my $lock_fh = shift;
my $pct_spool_dir = $Conf::Conf{'queuebulk'} . '/pct';
my $bad_dir = $Conf::Conf{'queuebulk'} . '/bad/' . $lock_fh->basename(1);
my $bad_dir = $self->{bad_directory} . '/' . $lock_fh->basename(1);
my $bad_file;
$bad_file = $bad_dir . '/' . $lock_fh->basename;
......@@ -115,22 +127,22 @@ sub quarantine {
return 1 if -d $bad_dir and $lock_fh->rename($bad_file);
$bad_file =
$pct_spool_dir . '/BAD-'
$self->{pct_directory} . '/BAD-'
. $lock_fh->basename(1) . '-'
. $lock_fh->basename;
return $lock_fh->rename($bad_file);
}
sub remove {
my $self = shift;
my $lock_fh = shift;
my $msg_spool_dir = $Conf::Conf{'queuebulk'} . '/msg';
my $pct_spool_dir = $Conf::Conf{'queuebulk'} . '/pct';
my $marshalled = $lock_fh->basename(1);
my $marshalled = $lock_fh->basename(1);
if ($lock_fh->unlink) {
if (rmdir($pct_spool_dir . '/' . $marshalled)) { # No more packet.
unlink($msg_spool_dir . '/' . $marshalled);
if (rmdir($self->{pct_directory} . '/' . $marshalled)) {
# No more packet.
unlink($self->{msg_directory} . '/' . $marshalled);
}
return 1;
}
......@@ -152,14 +164,11 @@ sub remove {
# sub merge_data ($rcpt, $listname, $robot_id, $data, $body, \$message_output)
sub store {
Log::do_log('debug2', '(%s, ...)', @_);
my $self = shift;
my $message = shift->dup;
my $rcpt = shift;
my %options = @_;
my $msg_spool_dir = $Conf::Conf{'queuebulk'} . '/msg';
my $pct_spool_dir = $Conf::Conf{'queuebulk'} . '/pct';
my ($list, $robot_id);
if (ref($message->{context}) eq 'Sympa::List') {
$list = $message->{context};
......@@ -188,7 +197,7 @@ sub store {
# are created bulk.pl may distribute them.
my $marshalled = tools::store_spool(
$msg_spool_dir,
$self->{msg_directory},
$message,
'%s.%s.%d.%f.%s@%s_%s,%ld,%d',
[ qw(priority packet_priority date time localpart domainpart tag PID RAND)
......@@ -197,10 +206,13 @@ sub store {
);
return unless $marshalled;
unless (mkdir($pct_spool_dir . '/' . $marshalled)) {
Log::do_log('err', 'Cannot mkdir %s/%s: %m',
$pct_spool_dir, $marshalled);
unlink($msg_spool_dir . '/' . $marshalled);
unless (mkdir($self->{pct_directory} . '/' . $marshalled)) {
Log::do_log(
'err',
'Cannot mkdir %s/%s: %m',
$self->{pct_directory}, $marshalled
);
unlink($self->{msg_directory} . '/' . $marshalled);
return;
}
......@@ -216,7 +228,7 @@ sub store {
my $serial = $message->{tag};
foreach my $rcpt (@rcpts) {
my $lock_fh = Sympa::LockedFile->new(
$pct_spool_dir . '/' . $marshalled . '/' . $serial,
$self->{pct_directory} . '/' . $marshalled . '/' . $serial,
5, '>>');
return unless $lock_fh;
......@@ -312,8 +324,9 @@ sub _get_recipient_tabs_by_domain {
# Old name: Bulk::there_is_too_much_remaining_packets().
sub too_much_remaining_packets {
Log::do_log('debug3', '');
my $remaining_packets = scalar @{$metadatas || []};
my $self = shift;
my $remaining_packets = scalar @{$self->{metadatas} || []};
if ($remaining_packets > Conf::get_robot_conf('*', 'bulk_fork_threshold'))
{
return $remaining_packets;
......@@ -339,14 +352,21 @@ TBD
L<Sympa::Bulk> implements the spool for bulk sending.
=head2 Functions
=head2 Methods
=over
=item new ( )
I<Constructor>.
Creates new instance of L<Sympa::Bulk>.
=item next ( )
Gets next packet to process, order is controled by priority, then by
packet_priority, then by delivery date, then by reception date.
I<Instance method>.
Gets next packet to process, order is controled by message priority, then by
packet priority, then by delivery date, then by reception date.
Packets with future delivery date are ignored.
Packet will be locked to prevent multiple proccessing of a single packet.
Parameters:
......@@ -360,6 +380,7 @@ a packet.
=item quarantine ( $handle )
I<Instance method>.
Quarantines a packet.
Packet will be moved into bad/ subdirectory of the spool.
......@@ -380,6 +401,7 @@ Otherwise false value.
=item remove ( $handle )
I<Instance method>.
Removes a packet.
If the packet is the last one of bulk sending,
corresponding message will also be removed from spool.
......@@ -402,6 +424,7 @@ Otherwise false value.
=item store ( $message, $rcpt, [ original =E<gt> $original ],
[ tag =E<gt> $tag ] )
I<Instance method>.
Stores the message into message spool.
Recipients will be splitted into multiple packets and
stored into packet spool.
......@@ -412,14 +435,31 @@ Parameters:
=item $message
Message to be stored.
Message to be stored. Following attributes are referred:
=over
{envelope_sender} attribute of the message will be used as SMTP "MAIL FROM:"
field.
=item {envelope_sender}
SMTP "MAIL FROM:" field.
=item {priority}
Message priority.
=item {packet_priority}
Packet priority.
=item {date}
Unix time when the message would be delivered.
=back
=item $rcpt
Scalar, scalarref or arrayref, for SMTP "RCPT TO:" field.
Scalar, scalarref or arrayref, for SMTP "RCPT TO:" field(s).
=item original =E<gt> $original
......@@ -438,6 +478,7 @@ Otherwise C<undef>.
=item too_much_remaining_packets ( )
I<Instance method>.
Returns true value if the number of remaining packets exceeds
the value of the C<bulk_fork_threshold> config parameter.
......@@ -458,4 +499,3 @@ Rewritten L<Sympa::Bulk> appeared on Sympa 6.2, using spools based on
filesystem.
=cut
......@@ -43,6 +43,7 @@ use XML::LibXML;
use Sympa::Archive;
use Sympa::Auth;
use Sympa::Bulk;
use Conf;
use Sympa::ConfDef;
use Sympa::Constants;
......@@ -2248,7 +2249,7 @@ sub _mail_message {
# Overwrite original envelope sender. It is REQUIRED for delivery.
$message->{envelope_sender} = $list->get_list_address('return_path');
return Sympa::Bulk::store($message, $rcpt, tag => $tag)
return Sympa::Bulk->new->store($message, $rcpt, tag => $tag)
|| undef;
}
......@@ -2607,7 +2608,7 @@ sub send_dsn {
# Set envelope sender. DSN _must_ have null envelope sender.
$dsn_message->{envelope_sender} = '<>';
}
unless ($dsn_message and Sympa::Bulk::store($dsn_message, $sender)) {
unless ($dsn_message and Sympa::Bulk->new->store($dsn_message, $sender)) {
Log::do_log('err', 'Unable to send DSN to %s', $sender);
return undef;
}
......@@ -2948,6 +2949,7 @@ sub send_confirm_to_editor {
'auto_submitted' => 'auto-forwarded',
};
my $bulk = Sympa::Bulk->new;
foreach my $recipient (@rcpt) {
my $new_message = $message->dup;
if ($new_message->{'smime_crypted'}) {
......@@ -2997,7 +2999,7 @@ sub send_confirm_to_editor {
$confirm_message->{'date'} = time + 1;
}
unless ($confirm_message
and defined Sympa::Bulk::store($confirm_message, $recipient)) {
and defined $bulk->store($confirm_message, $recipient)) {
Log::do_log('notice', 'Unable to send template "moderate" to %s',
$recipient);
return undef;
......@@ -3083,7 +3085,7 @@ sub send_confirm_to_sender {
$confirm_message->{'date'} = time + 1;
}
unless ($confirm_message
and defined Sympa::Bulk::store($confirm_message, $sender)) {
and defined Sympa::Bulk->new->store($confirm_message, $sender)) {
Log::do_log('notice', 'Unable to send template "send_auth" to %s',
$sender);
return undef;
......@@ -3728,7 +3730,7 @@ sub send_probe_to_user {
$message->{priority} =
Conf::get_robot_conf($self->{'domain'}, 'sympa_priority');
}
unless ($message and defined Sympa::Bulk::store($message, $who)) {
unless ($message and defined Sympa::Bulk->new->store($message, $who)) {
Log::do_log('err', 'Could not send template %s to %s', $type, $who);
return undef;
}
......
......@@ -111,7 +111,7 @@ sub reject_report_msg {
$report_message->{'date'} = time + 1;
}
unless ($report_message
and defined Sympa::Bulk::store($report_message, $user)) {
and defined Sympa::Bulk->new->store($report_message, $user)) {
Log::do_log('notice',
'Unable to send template "message_report" to "%s"', $user);
}
......@@ -235,7 +235,7 @@ sub notice_report_msg {
$report_message->{'date'} = time + 1;
}
unless ($report_message
and defined Sympa::Bulk::store($report_message, $user)) {
and defined Sympa::Bulk->new->store($report_message, $user)) {
Log::do_log('notice',
'Unable to send template "message_report" to "%s"', $user);
}
......@@ -364,7 +364,7 @@ sub send_report_cmd {
$report_message->{'date'} = time + 1;
}
unless ($report_message
and defined Sympa::Bulk::store($report_message, $sender)) {
and defined Sympa::Bulk->new->store($report_message, $sender)) {
Log::do_log('notice',
'Unable to send template "command_report" to %s', $sender);
}
......
......@@ -40,6 +40,7 @@ use Time::HiRes qw();
use Sympa::Alarm;
use Sympa::Auth;
use Sympa::Bulk;
use Conf;
use Sympa::Constants;
use Sympa::Language;
......@@ -1063,7 +1064,7 @@ sub send_file {
Sympa::Message->new_from_template($that, $tpl, $who, $context,
%options);
unless ($message and defined Sympa::Bulk::store($message, $who)) {
unless ($message and defined Sympa::Bulk->new->store($message, $who)) {
Log::do_log('err', 'Could not send template %s to %s', $tpl, $who);
return undef;
}
......
......@@ -194,6 +194,8 @@ $options->{'multiple_process'} = 1;
$Conf::Conf{'maxsmtp'} =
int($Conf::Conf{'maxsmtp'} / $Conf::Conf{'bulk_max_count'});
my $bulk = Sympa::Bulk->new;
while (!$end) {
# Enable SMTP logging if required.
$mailer->{log_smtp} = $default_log_smtp;
......@@ -264,7 +266,7 @@ while (!$end) {
## Start new processes if there remain at least
## 'bulk_fork_threshold' packets to send in the bulkmailer_table
## table
if (my $r_packets = Sympa::Bulk::too_much_remaining_packets()
if (my $r_packets = $bulk->too_much_remaining_packets()
and scalar(@remaining_children) <
$Conf::Conf{'bulk_max_count'}) {
## disconnect from database before fork
......@@ -322,7 +324,7 @@ while (!$end) {
## number of remaining packets to send is reasonnable).
if ( $main::daemon_usage eq 'DAEMON_SLAVE'
and time() - $date_of_last_activity > $Conf::Conf{'bulk_lazytime'}
and !(my $r_packets = Sympa::Bulk::too_much_remaining_packets())) {
and !(my $r_packets = $bulk->too_much_remaining_packets())) {
Log::do_log('info',
'Process %s didn\'t send any message since %s seconds, exiting',
$PID, $Conf::Conf{'bulk_lazytime'});
......@@ -330,7 +332,7 @@ while (!$end) {
}
## Go through the bulk_mailer table and process messages
($message, $handle) = Sympa::Bulk::next();
($message, $handle) = $bulk->next();
if ($message and $handle) {
# Get list/robot context.
my ($list, $robot, $listname);
......@@ -454,7 +456,7 @@ while (!$end) {
tools::send_notify_to_listmaster($list, 'bulk_failed',
{'message_id' => $message->get_id});
# Quarantine packet into bad spool.
Sympa::Bulk::quarantine($handle);
$bulk->quarantine($handle);
last; # foreach $rcpt
}
delete $new_message->{shelved}{merge};
......@@ -475,7 +477,7 @@ while (!$end) {
$rcpt
);
# Quarantine packet into bad spool.
Sympa::Bulk::quarantine($handle);
$bulk->quarantine($handle);
last; # foreach $rcpt
}
delete $new_message->{shelved}{smime_encrypt};
......@@ -509,7 +511,7 @@ while (!$end) {
Log::do_log('err',
'Failed to store message %s into mailer: %m',
$new_message);
Sympa::Bulk::quarantine($handle);
$bulk->quarantine($handle);
undef $handle;
last; # foreach my $rcpt (@rcpts)
}
......@@ -558,14 +560,14 @@ while (!$end) {
Log::do_log('err',
'Failed to store message %s into mailer: %m',
$new_message);
Sympa::Bulk::quarantine($handle);
$bulk->quarantine($handle);
undef $handle;
}
}
## Remove packet once it has been processed
if ($handle) {
unless (Sympa::Bulk::remove($handle)) {
unless ($bulk->remove($handle)) {
Log::do_log('err', 'Failed to remove processed packet %s',
$message);
}
......@@ -574,7 +576,7 @@ while (!$end) {
$date_of_last_activity = time();
} elsif ($handle) {
Log::do_log('err', 'Cannot parse message <%s>', $handle->basename);
Sympa::Bulk::quarantine($handle);
$bulk->quarantine($handle);
next;
} else {
## Sleep for a while if bulk_mailer DB table is empty
......
......@@ -179,6 +179,7 @@ Log::do_log('notice', 'Task_Manager %s Started', Sympa::Constants::VERSION());
###### VARIABLES DECLARATION ######
my $bulk = Sympa::Bulk->new;
my $spool_task = $Conf::Conf{'queuetask'};
my $cert_dir = $Conf::Conf{'ssl_cert_dir'};
my @tasks; # list of tasks in the spool
......@@ -2001,8 +2002,7 @@ sub process_bouncers {
};
my $message = Sympa::Message->new_from_template($list,
'automatic_bounce_management', $to, $param);
unless ($message
and defined Sympa::Bulk::store($message, $rcpt)) {
unless ($message and defined $bulk->store($message, $rcpt)) {
Log::do_log(
'err',
'Unable to send template "automatic_bounce_management" to %s',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment