Commit 61c9b884 authored by sikeda's avatar sikeda
Browse files

[*change] Spool for bulk sending no longer depends on database tables: It based on filesystem.

Packets and messages stored in database by earlier version of Sympa should be migrated using update_bulk_spool.pl utility.


git-svn-id: https://subversion.renater.fr/sympa/branches/sympa-6.2-branch@11556 05aa8bb8-cd2b-0410-b1d7-8918dfa770ce
parent 0e729701
......@@ -69,6 +69,7 @@ installdir:
@for dir in $(expldir) $(spooldir) $(spooldir)/msg $(spooldir)/digest \
$(spooldir)/moderation $(spooldir)/expire $(spooldir)/auth \
$(spooldir)/outgoing $(spooldir)/tmp $(spooldir)/task \
$(spooldir)/bulk \
$(bouncedir) $(arcdir) $(piddir) $(staticdir) $(staticdir)/js $(sysconfdir); do \
if [ ! -d $(DESTDIR)$$dir ] ; then \
echo "Creating $(DESTDIR)$$dir"; \
......
......@@ -711,7 +711,7 @@ sub checkfiles {
'queueauth', 'queueoutgoing',
'queuebounce', 'queuesubscribe',
'queuetask', 'queuedistribute',
'tmpdir'
'queuebulk', 'tmpdir'
) {
unless (-d $Conf{$qdir}) {
Log::do_log('info', 'Creating spool %s', $Conf{$qdir});
......@@ -734,7 +734,8 @@ sub checkfiles {
}
## Also create associated bad/ spools
foreach my $qdir ('queue', 'queuedistribute', 'queueautomatic') {
foreach
my $qdir ('queue', 'queuedistribute', 'queueautomatic', 'queuebulk') {
my $subdir = $Conf{$qdir} . '/bad';
unless (-d $subdir) {
Log::do_log('info', 'Creating spool %s', $subdir);
......
This diff is collapsed.
......@@ -677,7 +677,7 @@ our @params = (
},
{ 'name' => 'queue',
'default' => Sympa::Constants::SPOOLDIR . '/msg',
'gettext_id' => 'Directory for incoming spool',
'gettext_id' => 'Directory for message incoming spool',
'file' => 'sympa.conf',
},
{ 'name' => 'queuedistribute',
......@@ -701,7 +701,7 @@ our @params = (
},
{ 'name' => 'queueoutgoing',
'default' => Sympa::Constants::SPOOLDIR . '/outgoing',
'gettext_id' => 'Directory for outgoing spool',
'gettext_id' => 'Directory for archive spool',
'file' => 'sympa.conf',
},
{ 'name' => 'queuesubscribe',
......@@ -730,6 +730,11 @@ our @params = (
'gettext_id' => 'Directory for automatic list creation spool',
'file' => 'sympa.conf',
},
{ 'name' => 'queuebulk',
'default' => Sympa::Constants::SPOOLDIR . '/bulk',
'gettext_id' => 'Directory for message outgoing spool',
'file' => 'sympa.conf',
},
{ 'name' => 'sleep',
'default' => '5',
'gettext_comment' => 'Must not be 0.',
......@@ -770,6 +775,9 @@ our @params = (
{ 'name' => 'clean_delay_queueautomatic',
'default' => '10',
},
{ 'name' => 'clean_delay_queuebulk',
'default' => '7',
},
{ 'name' => 'clean_delay_tmpdir',
'default' => '7',
},
......
......@@ -214,157 +214,157 @@ my %full_db_struct = (
'The user_table is mainly used to manage login from web interface. A subscriber may not appear in the user_table if he never log through the web interface.',
'order' => 2,
},
'bulkspool_table' => {
'fields' => {
'messagekey_bulkspool' => {
'struct' => 'varchar(33)',
'doc' => 'primary key',
'primary' => 1,
'not_null' => 1,
'order' => 1,
},
'message_bulkspool' => {
'struct' => 'longtext',
'doc' => 'message as string b64 encoded',
'order' => 2,
},
#'messageid_bulkspool' => {
# 'struct' => 'varchar(300)',
# 'doc' => 'stored to list spool content faster',
# 'order' => 4,
#},
'lock_bulkspool' => {
'struct' => 'int(1)',
'doc' =>
'when set to 1, this field prevents Sympa from processing the message',
'order' => 5,
},
#'dkim_privatekey_bulkspool' => {
# 'struct' => 'varchar(2000)',
# 'doc' =>
# 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, private key to sign message',
# 'order' => 6,
#},
#'dkim_selector_bulkspool' => {
# 'struct' => 'varchar(50)',
# 'doc' =>
# 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, DKIM selector to sign message',
# 'order' => 7,
#},
#'dkim_d_bulkspool' => {
# 'struct' => 'varchar(50)',
# 'doc' =>
# 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, the d DKIM parameter',
# 'order' => 8,
#},
#'dkim_i_bulkspool' => {
# 'struct' => $email_struct,
# 'doc' =>
# 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, DKIM i signature parameter',
# 'order' => 9,
#},
},
'doc' => 'This table contains the messages to be sent by bulk.pl',
'order' => 3,
},
'bulkmailer_table' => {
'fields' => {
'messagekey_bulkmailer' => {
'struct' => 'varchar(80)',
'doc' =>
'A pointer to a message in spool_table.It must be a value of a line in table spool_table with same value as messagekey_bulkspool',
'primary' => 1,
'not_null' => 1,
'order' => 1,
},
'packetid_bulkmailer' => {
'struct' => 'varchar(33)',
'doc' => 'An id for the packet',
'primary' => 1,
'not_null' => 1,
'order' => 2,
},
#'messageid_bulkmailer' => {
# 'struct' => 'varchar(200)',
# 'doc' => 'The message Id',
# 'order' => 3,
#},
##FIXME: column name is "recEipients_bulkmailer"
'receipients_bulkmailer' => {
'struct' => 'text',
'doc' =>
'the comma separated list of recipient email for this message',
'order' => 4,
},
#'returnpath_bulkmailer' => {
# 'struct' => $email_struct,
# 'doc' =>
# 'the return path value that must be set when sending the message',
# 'order' => 5,
#},
'robot_bulkmailer' => {
'struct' => $robot_struct,
'doc' => '',
'order' => 6,
},
'listname_bulkmailer' => {
'struct' => $list_struct,
'doc' => '',
'order' => 7,
},
#'verp_bulkmailer' => {
# 'struct' => 'int(1)',
# 'doc' =>
# 'A boolean to specify if VERP is requiered, in this case return_path will be formatted using VERP form',
# 'order' => 8,
#},
#'tracking_bulkmailer' => {
# 'struct' => "enum('mdn','dsn')",
# 'doc' => 'Is DSN or MDN requiered when sending this message?',
# 'order' => 9,
#},
#'merge_bulkmailer' => {
# 'struct' => 'int(1)',
# 'doc' =>
# 'Boolean, if true, the message is to be parsed as a TT2 template foreach recipient',
# 'order' => 10,
#},
'priority_message_bulkmailer' => {
'struct' => 'smallint(10)',
'doc' => 'FIXME',
'order' => 11,
},
'priority_packet_bulkmailer' => {
'struct' => 'smallint(10)',
'doc' => 'FIXME',
'order' => 12,
},
'reception_date_bulkmailer' => {
'struct' => 'double',
'doc' => 'The date where the message was received',
'order' => 13,
},
'delivery_date_bulkmailer' => {
'struct' => 'int(11)',
'doc' => 'The date the message was sent',
'order' => 14,
},
'lock_bulkmailer' => {
'struct' => 'varchar(30)',
'doc' =>
'A lock. It is set as process-number @ hostname so multiple bulkmailer can handle this spool',
'order' => 15,
},
'tag_bulkmailer' => {
'struct' => 'varchar(10)',
'doc' => 'Additional tag used to sort packets',
'order' => 16,
},
},
'doc' =>
'storage of recipients with a ref to a message in spool_table. So a very simple process can distribute them',
'order' => 4,
},
#'bulkspool_table' => {
# 'fields' => {
# 'messagekey_bulkspool' => {
# 'struct' => 'varchar(33)',
# 'doc' => 'primary key',
# 'primary' => 1,
# 'not_null' => 1,
# 'order' => 1,
# },
# 'message_bulkspool' => {
# 'struct' => 'longtext',
# 'doc' => 'message as string b64 encoded',
# 'order' => 2,
# },
# #'messageid_bulkspool' => {
# # 'struct' => 'varchar(300)',
# # 'doc' => 'stored to list spool content faster',
# # 'order' => 4,
# #},
# 'lock_bulkspool' => {
# 'struct' => 'int(1)',
# 'doc' =>
# 'when set to 1, this field prevents Sympa from processing the message',
# 'order' => 5,
# },
# #'dkim_privatekey_bulkspool' => {
# # 'struct' => 'varchar(2000)',
# # 'doc' =>
# # 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, private key to sign message',
# # 'order' => 6,
# #},
# #'dkim_selector_bulkspool' => {
# # 'struct' => 'varchar(50)',
# # 'doc' =>
# # 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, DKIM selector to sign message',
# # 'order' => 7,
# #},
# #'dkim_d_bulkspool' => {
# # 'struct' => 'varchar(50)',
# # 'doc' =>
# # 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, the d DKIM parameter',
# # 'order' => 8,
# #},
# #'dkim_i_bulkspool' => {
# # 'struct' => $email_struct,
# # 'doc' =>
# # 'DKIM parameter stored for bulk daemon because bulk ignore list parameters, DKIM i signature parameter',
# # 'order' => 9,
# #},
# },
# 'doc' => 'This table contains the messages to be sent by bulk.pl',
# 'order' => 3,
#},
#'bulkmailer_table' => {
# 'fields' => {
# 'messagekey_bulkmailer' => {
# 'struct' => 'varchar(80)',
# 'doc' =>
# 'A pointer to a message in spool_table.It must be a value of a line in table spool_table with same value as messagekey_bulkspool',
# 'primary' => 1,
# 'not_null' => 1,
# 'order' => 1,
# },
# 'packetid_bulkmailer' => {
# 'struct' => 'varchar(33)',
# 'doc' => 'An id for the packet',
# 'primary' => 1,
# 'not_null' => 1,
# 'order' => 2,
# },
# #'messageid_bulkmailer' => {
# # 'struct' => 'varchar(200)',
# # 'doc' => 'The message Id',
# # 'order' => 3,
# #},
# ##FIXME: column name is "recEipients_bulkmailer"
# 'receipients_bulkmailer' => {
# 'struct' => 'text',
# 'doc' =>
# 'the comma separated list of recipient email for this message',
# 'order' => 4,
# },
# #'returnpath_bulkmailer' => {
# # 'struct' => $email_struct,
# # 'doc' =>
# # 'the return path value that must be set when sending the message',
# # 'order' => 5,
# #},
# 'robot_bulkmailer' => {
# 'struct' => $robot_struct,
# 'doc' => '',
# 'order' => 6,
# },
# 'listname_bulkmailer' => {
# 'struct' => $list_struct,
# 'doc' => '',
# 'order' => 7,
# },
# #'verp_bulkmailer' => {
# # 'struct' => 'int(1)',
# # 'doc' =>
# # 'A boolean to specify if VERP is requiered, in this case return_path will be formatted using VERP form',
# # 'order' => 8,
# #},
# #'tracking_bulkmailer' => {
# # 'struct' => "enum('mdn','dsn')",
# # 'doc' => 'Is DSN or MDN requiered when sending this message?',
# # 'order' => 9,
# #},
# #'merge_bulkmailer' => {
# # 'struct' => 'int(1)',
# # 'doc' =>
# # 'Boolean, if true, the message is to be parsed as a TT2 template foreach recipient',
# # 'order' => 10,
# #},
# 'priority_message_bulkmailer' => {
# 'struct' => 'smallint(10)',
# 'doc' => 'FIXME',
# 'order' => 11,
# },
# 'priority_packet_bulkmailer' => {
# 'struct' => 'smallint(10)',
# 'doc' => 'FIXME',
# 'order' => 12,
# },
# 'reception_date_bulkmailer' => {
# 'struct' => 'double',
# 'doc' => 'The date where the message was received',
# 'order' => 13,
# },
# 'delivery_date_bulkmailer' => {
# 'struct' => 'int(11)',
# 'doc' => 'The date the message was sent',
# 'order' => 14,
# },
# 'lock_bulkmailer' => {
# 'struct' => 'varchar(30)',
# 'doc' =>
# 'A lock. It is set as process-number @ hostname so multiple bulkmailer can handle this spool',
# 'order' => 15,
# },
# 'tag_bulkmailer' => {
# 'struct' => 'varchar(10)',
# 'doc' => 'Additional tag used to sort packets',
# 'order' => 16,
# },
# },
# 'doc' =>
# 'storage of recipients with a ref to a message in spool_table. So a very simple process can distribute them',
# 'order' => 4,
#},
'exclusion_table' => {
'fields' => {
'list_exclusion' => {
......
......@@ -129,6 +129,15 @@ sub extend {
return 1;
}
sub basename {
my $self = shift;
die 'Lock not found' unless exists $lock_of{$self + 0};
my ($basename) = reverse split '/', $lock_of{$self + 0}->{file};
return $basename;
}
sub rename {
my $self = shift;
my $destfile = shift;
......@@ -324,6 +333,10 @@ Following methods are specific to this module.
=over
=item basename ( )
Gets base name of locked file.
=item extend ( )
Extends stale lock timeout.
......
......@@ -82,7 +82,7 @@ sub reaper {
Log::do_log(
'debug2',
'Reaper unwaited pids: %s Open = %s',
join(' ', sort {$a <=> $b} keys %{$self->{pids}}),
join(' ', sort { $a <=> $b } keys %{$self->{pids}}),
$self->{opensmtp}
);
return $i;
......@@ -104,8 +104,8 @@ sub store {
my $return_path = $message->{envelope_sender};
my $envid = $params{envid};
my $logging = $params{logging};
$logging = 1 unless defined $logging;
my $tag = $params{tag};
my $logging = (not defined $tag or $tag eq 's' or $tag eq 'z') ? 1 : 0;
my $robot_id;
if (ref $message->{context} eq 'Sympa::List') {
......@@ -138,14 +138,15 @@ sub store {
while (@all_rcpt) {
# Split rcpt by max length of command line (_SC_ARG_MAX).
my $cmd_size = $min_cmd_size + 1 + length($all_rcpt[0]);
my @rcpt = (shift @all_rcpt);
my @rcpt = (shift @all_rcpt);
while (@all_rcpt
and ($cmd_size += 1 + length($all_rcpt[0])) <= $max_arg) {
push @rcpt, (shift @all_rcpt);
}
my $pipeout = $self->_get_sendmail_handle(
$return_path, [@rcpt], $robot_id, $envid);
my $pipeout =
$self->_get_sendmail_handle($return_path, [@rcpt], $robot_id,
$envid);
print $pipeout $msg_string;
unless (close $pipeout) {
return undef;
......@@ -226,7 +227,7 @@ sub _get_sendmail_handle {
if ($self->{log_smtp}) {
Log::do_log(
'debug3', '%s %s -f \'%s\' -- %s',
$sendmail, join(' ', @sendmail_args),
$sendmail, join(' ', @sendmail_args),
$return_path, join(' ', @$rcpt)
);
}
......@@ -322,7 +323,7 @@ Returns:
PID.
=item store ( $message, $rcpt,
[ envid =E<gt> $envid ], [ logging =E<gt> $logging ] )
[ envid =E<gt> $envid ], [ tag =E<gt> $tag ] )
I<Instance method>.
Makes a sendmail ready for the recipients given as argument, uses a file
......@@ -349,7 +350,7 @@ Scalar, scalarref or arrayref, for SMTP "RCPT TO:" field.
An envelope ID of this message submission in notification table.
See also L<Sympa::Tracking>.
=item $logging
=item $tag
TBD
......
......@@ -3761,6 +3761,18 @@ Domain, type and local part of context.
Priority of the message.
=item {tag}
Tag of packet used by bulk spool to control logging.
C<'0'> is the first message of multiple packet.
C<'z'> is the last.
C<'s'> is the single message with single packet.
=item {time}
The Unix time in floating point number when the message was stored into the
spool. This is used by bulk spool.
=back
=head2 Attributes
......@@ -3780,7 +3792,7 @@ Envelope sender, a.k.a. "Unix From".
This is not always same as {sender} attribute
nor the content of C<From:> field.
C<'E<lt>E<gt>'> is used for "null envelope sender".
C<'E<lt>E<gt>'> will be used for "null envelope sender".
=item {family}
......@@ -3795,14 +3807,15 @@ Display name of actual sender (see {sender} below), if any.
True value indicates that the message has been authenticated by C<md5> level
(password authentication).
This is set by web mailer of WWSympa and used by incoming spool.
=item {message_id}
Original message ID of the message
Original message ID of the message.
=item {rcpt}
Currently unavailable.
Recipients for delivery. This is used by bulk spool.
=item {sender}
......@@ -3886,10 +3899,10 @@ Below is an example of serialized form.
Bonjour, le monde. : |
: ---
On the messages in msg and bounce spools,
C<Return-Path:> header fields are given by MDA,
and C<X-Sympa-*:> header fields are given by queue programs. On other
spools, they are given by components of Sympa.
On msg, automatic and bounce spools,
C<Return-Path:> header fields are given by MDA
and C<X-Sympa-*:> header fields are given by queue programs.
On other spools, they are given by components of Sympa.
Pseudo-header fields I<should> appear at beginning of serialized content.
Fields appear at other places (e.g. C<X-Sympa-Topic:> field above) are not
......
......@@ -240,9 +240,8 @@ sub send_crash_report {
# return a lockname that is a uniq id of a processus (hostname + pid) ;
# hostname(20) and pid(10) are truncated in order to store lockname in
# database varchar(30)
sub get_lockname {
return substr(substr(Sys::Hostname::hostname(), 0, 20) . $PID, 0, 30);
}
# DEPRECATED: No longer used.
#sub get_lockname();
## Returns the list of pid identifiers in the pid file.
sub get_pids_in_pid_file {
......
......@@ -201,7 +201,7 @@ while (!$end) {
# Process grouped notifications
Sympa::Alarm::flush();
my $bulk;
my ($message, $handle);
unless ($main::options{'foreground'}) {
##
......@@ -263,9 +263,8 @@ while (!$end) {
## table
if (my $r_packets =
Sympa::Bulk::there_is_too_much_remaining_packets()
and $#remaining_children + 1 < $Conf::Conf{'bulk_max_count'})
{
and scalar(@remaining_children) <
$Conf::Conf{'bulk_max_count'}) {
## disconnect from database before fork
## to prevent DB handlers to be shared by different processes
# when loading conf in database disconnect because of sharing
......@@ -278,13 +277,14 @@ while (!$end) {
'Important workload: %s packets to process. Creating %s child bulks to increase sending rate',
$r_packets,
$Conf::Conf{'bulk_max_count'} -
($#remaining_children + 1)
scalar(@remaining_children)
);
for my $process_count (
1 .. $Conf::Conf{'bulk_max_count'} -
($#remaining_children + 1)) {
scalar(@remaining_children)) {
Log::do_log('info', "Will fork: %s", $process_count);
if ((my $child_pid = fork) != 0) {
my $child_pid = fork;
if ($child_pid) {
Log::do_log('info',
'Starting bulk child daemon, PID %s',
$child_pid);
......@@ -293,14 +293,16 @@ while (!$end) {
$child_pid, $options);
$created_children{$child_pid} = 1;
sleep 1;
} elsif (not defined $child_pid) {
Log::do_log('err', 'Cannot fork: %m');
last;
} else {
## We're in a child bulk process
close STDERR;
Sympa::Tools::Daemon::direct_stderr_to_file(
('pid' => $PID));
$date_of_last_activity = time();
$main::daemon_usage =
'DAEMON_SLAVE'; # automatic lists creation
$main::daemon_usage = 'DAEMON_SLAVE'; # child bulk
Log::do_openlog($Conf::Conf{'syslog'},
$Conf::Conf{'log_socket_type'}, 'bulk');
Log::do_log('info',
......@@ -328,48 +330,21 @@ while (!$end) {
}
## Go through the bulk_mailer table and process messages
if ($bulk = Sympa::Bulk::next()) {
($message, $handle) = Sympa::Bulk::next();
if ($message and $handle) {
# Get list/robot context.
# NOTE: The robot that injected packet can no longer be available.
# In such case fallback to site-wide robot.
my $listname = $bulk->{'listname'};
my $robot = $bulk->{'robot'};
my $list;