Commit bbd2a5f0 authored by sikeda's avatar sikeda
Browse files

[dev] Refactoring. Introducing Sympa::Spool::Incoming class for incoming (msg) spool.


git-svn-id: https://subversion.renater.fr/sympa/branches/sympa-6.2-branch@12345 05aa8bb8-cd2b-0410-b1d7-8918dfa770ce
parent 869781a3
......@@ -37,6 +37,7 @@ use Conf;
use Sympa::LockedFile;
use Sympa::Log;
use Sympa::Spool;
use Sympa::Spool::Incoming;
my %options;
unless (GetOptions(\%options, 'help|h', 'dry_run', 'version|v')) {
......@@ -71,57 +72,36 @@ unless (($GID == (getgrnam(Sympa::Constants::GROUP))[2])
umask oct $Conf::Conf{'umask'};
my $bulk = Sympa::Bulk->new;
my $spool_dir = $Conf::Conf{'queue'};
my $spool = Sympa::Spool::Incoming->new;
my $spool_dir = $spool->{directory};
mkdir "$spool_dir/bad", 0755 unless -d "$spool_dir/bad";
mkdir "$spool_dir/moved", 0755 unless -d "$spool_dir/moved";
my ($dh, $filename);
unless (opendir $dh, $spool_dir) {
printf STDERR "Cannot open directory %s.\n", $spool_dir;
exit 1;
}
while ($filename = readdir $dh) {
next unless -f "$spool_dir/$filename";
next if $filename =~ /,lock/; # Skip lock.
next unless /\AT\./;
next unless /\ABAD-/;
next unless /\A\.+/;
my $lock_fh =
Sympa::LockedFile->new($spool_dir . '/' . $filename, -1, '+<');
next unless $lock_fh;
my $metadata = Sympa::Spool::unmarshal_metadata(
$spool_dir, $filename,
qr{\A([^\s\@]+)(?:\@([\w\.\-]+))?\.(\d+)\.(\w+)(?:,.*)?\z},
[qw(localpart domainpart date pid)]
);
unless ($metadata) {
$lock_fh->close;
next;
}
my $msg_string = do { local $RS; <$lock_fh> };
my $message = Sympa::Message->new($msg_string, %$metadata);
unless ($message and $message->{checksum}) {
$lock_fh->close;
while (1) {
my ($message, $handle) = $spool->next;
if ($message and $handle) {
my $status = process($message);
unless (defined $status) {
$spool->quarantine($handle) unless $options{dry_run};
} elsif ($status) {
$handle->rename($spool_dir . '/moved/' . $handle->basename)
unless $options{dry_run};
} else {
next;
}
} elsif ($handle) {
next;
}
unless (process($message)) {
$lock_fh->rename($spool_dir . '/bad/' . $filename)
unless $options{dry_run};
} else {
$lock_fh->rename($spool_dir . '/moved/' . $filename)
unless $options{dry_run};
last;
}
}
closedir $dh;
sub process {
my $message = shift;
return 0 unless $message->{checksum};
## valid X-Sympa-Checksum prove the message comes from web interface with
## authenticated sender
unless ($message->{'checksum'} eq sympa_checksum($message->{'rcpt'})) {
......@@ -134,6 +114,7 @@ sub process {
$message->{'md5_check'} = 1;
delete $message->{checksum};
# Don't use method of incoming spool to preserve original PID.
Sympa::Spool::store_spool($spool_dir, $message, '%s@%s.%ld.%ld,%d',
[qw(localpart domainpart date pid RAND)])
unless $options{dry_run};
......
......@@ -72,8 +72,8 @@ use Sympa::Robot;
use Sympa::Scenario;
use Sympa::Session;
use Sympa::SharedDocument;
use Sympa::Spool;
use Sympa::Spool::Archive;
use Sympa::Spool::Incoming;
use Sympa::Template;
use tools;
use Sympa::Tools::Data;
......@@ -9141,12 +9141,7 @@ sub do_distribute {
);
$message->add_header('Content-Type', 'text/plain; Charset=utf-8');
 
unless (
Sympa::Spool::store_spool(
$Conf::Conf{'queue'}, $message,
'%s@%s.%ld.%ld,%d', [qw(localpart domainpart TIME PID RAND)]
)
) {
unless (Sympa::Spool::Incoming->new->store($message)) {
Sympa::Report::reject_report_web(
'intern',
'cannot_send_distribute',
......@@ -20337,12 +20332,7 @@ sub do_remind {
);
$message->add_header('Content-Type', 'text/plain; Charset=utf-8');
 
unless (
Sympa::Spool::store_spool(
$Conf::Conf{'queue'}, $message,
'%s@%s.%ld.%ld,%d', [qw(localpart domainpart TIME PID RAND)]
)
) {
unless (Sympa::Spool::Incoming->new->store($message)) {
Sympa::Report::reject_report_web(
'intern',
'cannot_send_remind',
......@@ -21712,12 +21702,7 @@ sub do_send_mail {
$l_message->{sender} = $param->{'user'}{'email'};
$l_message->{md5_check} = 1;
 
unless (
Sympa::Spool::store_spool(
$Conf::Conf{'queue'}, $l_message, '%s@%s.%ld.%ld,%d',
[qw(localpart domainpart TIME PID RAND)]
)
) {
unless (Sympa::Spool::Incoming->new->store($l_message)) {
Sympa::Report::reject_report_web(
'intern',
'cannot_send_mail',
......@@ -21916,12 +21901,7 @@ sub do_tag_topic_by_sender {
);
$cmd_message->add_header('Content-Type', 'text/plain; Charset=utf-8');
 
unless (
Sympa::Spool::store_spool(
$Conf::Conf{'queue'}, $cmd_message,
'%s@%s.%ld.%ld,%d', [qw(localpart domainpart TIME PID RAND)]
)
) {
unless (Sympa::Spool::Incoming->new->store($cmd_message)) {
Sympa::Report::reject_report_web(
'intern',
'cannot_send_mail',
......
......@@ -800,9 +800,9 @@ sub checkfiles {
}
foreach my $qdir (
'spool', 'queue', 'queueautomatic', 'queuedigest',
'queuemod', 'queuetopic', 'queueauth', 'queuebounce',
'queuesubscribe', 'queuetask', 'tmpdir'
'spool', 'queueautomatic', 'queuedigest', 'queuemod',
'queuetopic', 'queueauth', 'queuebounce', 'queuesubscribe',
'queuetask', 'tmpdir'
) {
unless (-d $Conf{$qdir}) {
$log->syslog('info', 'Creating spool %s', $Conf{$qdir});
......@@ -826,7 +826,7 @@ sub checkfiles {
}
## Also create associated bad/ spools
foreach my $qdir ('queue', 'queueautomatic', 'queuebounce') {
foreach my $qdir ('queueautomatic', 'queuebounce') {
my $subdir = $Conf{$qdir} . '/bad';
unless (-d $subdir) {
$log->syslog('info', 'Creating spool %s', $subdir);
......
......@@ -77,6 +77,7 @@ nobase_modules_DATA = \
Sympa/SOAP/Transport.pm \
Sympa/Spool.pm \
Sympa/Spool/Archive.pm \
Sympa/Spool/Incoming.pm \
Sympa/Task.pm \
Sympa/Template.pm \
tools.pm \
......
# -*- indent-tabs-mode: nil; -*-
# vim:ft=perl:et:sw=4
# $Id$
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright (c) 1997, 1998, 1999 Institut Pasteur & Christophe Wolfhugel
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015 GIP RENATER
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
package Sympa::Spool::Incoming;
use strict;
use warnings;
use English qw(-no_match_vars);
use Conf;
use Sympa::Constants;
use Sympa::LockedFile;
use Sympa::Log;
use Sympa::Message;
use Sympa::Spool;
use Sympa::Tools::File;
my $log = Sympa::Log->instance;
sub new {
my $class = shift;
my $self = bless {
directory => $Conf::Conf{'queue'},
bad_directory => $Conf::Conf{'queue'} . '/bad',
_metadatas => undef,
_highest_priority => 'z',
} => $class;
$self->_create_spool;
return $self;
}
sub _create_spool {
my $self = shift;
my $umask = umask oct $Conf::Conf{'umask'};
foreach my $directory ($self->{directory}, $self->{bad_directory}) {
unless (-d $directory) {
$log->syslog('info', 'Creating spool %s', $directory);
unless (
mkdir($directory, 0755)
and Sympa::Tools::File::set_file_rights(
file => $directory,
user => Sympa::Constants::USER(),
group => Sympa::Constants::GROUP()
)
) {
die sprintf 'Cannot create %s: %s', $directory, $ERRNO;
}
}
}
umask $umask;
}
sub next {
my $self = shift;
return unless $self->{directory};
unless ($self->{_metadatas}) {
my $dh;
unless (opendir $dh, $self->{directory}) {
die sprintf 'Cannot open dir %s: %s', $self->{directory}, $ERRNO;
}
$self->{_metadatas} = [
sort grep {
!/,lock/
and !m{(?:\A|/)(?:\.|T\.|BAD-)}
and -f ($self->{directory} . '/' . $_)
} readdir $dh
];
closedir $dh;
# Sort specific to this spool.
my %mtime =
map {
( $_ => Sympa::Tools::File::get_mtime(
$self->{directory} . '/' . $_
)
)
} @{$self->{_metadatas}};
$self->{_metadatas} =
[sort { $mtime{$a} <=> $mtime{$b} } @{$self->{_metadatas}}];
}
unless (@{$self->{_metadatas}}) {
undef $self->{_metadatas};
# Specific to this spool.
$self->{_highest_priority} = 'z';
return;
}
while (my $marshalled = shift @{$self->{_metadatas}}) {
my ($lock_fh, $metadata, $message);
# Try locking message. Those locked or removed by other process will
# be skipped.
$lock_fh =
Sympa::LockedFile->new($self->{directory} . '/' . $marshalled,
-1, '+<');
next unless $lock_fh;
$metadata = Sympa::Spool::unmarshal_metadata(
$self->{directory},
$marshalled,
qr{\A([^\s\@]+)(?:\@([\w\.\-]+))?\.(\d+)\.(\w+)(?:,.*)?\z},
[qw(localpart domainpart date pid rand)]
);
# Filter specific to this spool.
# - z and Z are a null priority, so file stay in queue and are
# processed only if renamed by administrator
next if $metadata and lc($metadata->{priority} || '') eq 'z';
# - Lazily seek highest priority: Messages with lower priority than
# those already found are skipped.
if (length($metadata->{priority} || '')) {
next if $self->{_highest_priority} lt $metadata->{priority};
$self->{_highest_priority} = $metadata->{priority};
}
if ($metadata) {
my $msg_string = do { local $RS; <$lock_fh> };
$message = Sympa::Message->new($msg_string, %$metadata);
}
# Though message might not be deserialized, anyway return the result.
return ($message, $lock_fh);
}
return;
}
sub quarantine {
my $self = shift;
my $lock_fh = shift;
my $bad_file;
$bad_file = $self->{'bad_directory'} . '/' . $lock_fh->basename;
unless (-d $self->{bad_directory} and $lock_fh->rename($bad_file)) {
$bad_file = $self->{directory} . '/BAD-' . $lock_fh->basename;
return undef unless $lock_fh->rename($bad_file);
}
return 1;
}
sub remove {
my $self = shift;
my $lock_fh = shift;
return $lock_fh->unlink;
}
sub store {
my $self = shift;
my $message = shift->dup;
my %options = @_;
$message->{date} = time unless defined $message->{date};
my $marshalled =
Sympa::Spool::store_spool($self->{directory}, $message,
'%s@%s.%ld.%ld,%d', [qw(localpart domainpart date PID RAND)],
%options);
return unless $marshalled;
$log->syslog('notice', 'Message %s is stored into archive spool as <%s>',
$message, $marshalled);
return $marshalled;
}
1;
__END__
=encoding utf-8
=head1 NAME
Sympa::Spool::Incoming - Spool for incoming messages
=head1 SYNOPSIS
use Sympa::Spool::Incoming;
my $spool = Sympa::Spool::Incoming->new;
$spool->store($message);
my ($message, $handle) = $spool->next;
=head1 DESCRIPTION
L<Sympa::Spool::Incoming> implements the spool for incoming messages.
Note:
In most cases, queue(8) program stores messages to incoming spool.
=head2 Methods
=over
=item new ( )
I<Constructor>.
Creates new instance of L<Sympa::Spool::Incoming>.
=item next ( )
I<Instance method>.
Gets next message to process, order is controled by delivery date, then
messages with possiblly higher priority are chosen.
Message will be locked to prevent multiple proccessing of a single message.
Parameters:
None.
Returns:
Two-elements list of L<Sympa::Message> instance and filehandle locking
a message.
=item quarantine ( $handle )
I<Instance method>.
Quarantines a message.
Message will be moved into bad/ subdirectory of the spool.
Parameter:
=over
=item $handle
Filehandle, L<Sympa::LockedFile> instance, locking message.
=back
Returns:
True value if message could be quarantined.
Otherwise false value.
=item remove ( $handle )
I<Instance method>.
Removes a message.
Parameter:
=over
=item $handle
Filehandle, L<Sympa::LockedFile> instance, locking message.
=back
Returns:
True value if message could be removed.
Otherwise false value.
=item store ( $message, [ original =E<gt> $original ] )
I<Instance method>.
Stores the message into spool.
Parameters:
=over
=item $message
Message to be stored. Following attributes and metadata are referred:
=over
=item {sender}
Sender of the message.
=item {date}
Unix time when the message would be delivered.
=item {time}
Unix time in floating point number when the message was stored.
=back
=item original =E<gt> $original
If the message was decrypted, stores original encrypted form.
=back
Returns:
If storing succeeded, marshalled metadata (file name) of the message.
Otherwise C<undef>.
=back
=head1 SEE ALSO
L<sympa_msg(8)>, L<Sympa::Message>.
=head1 HISTORY
L<Sympa::Spool::Incoming> appeared on Sympa 6.2.5.
=cut
......@@ -46,6 +46,7 @@ use Sympa::Mailer;
use Sympa::Message;
use Sympa::Report;
use Sympa::Spool;
use Sympa::Spool::Incoming;
use tools;
use Sympa::Tools::Daemon;
use Sympa::Tools::Data;
......@@ -182,7 +183,8 @@ $SIG{'PIPE'} = 'IGNORE'; # Ignore SIGPIPE ; prevents process from dying
# This is the main loop : look for files in the directory, handles
# them, sleeps a while and continues the good job.
my $spool = $Conf::Conf{'queueautomatic'};
my $spool = $Conf::Conf{'queueautomatic'};
my $spool_incoming = Sympa::Spool::Incoming->new;
while ($signal ne 'term') {
Sympa::List::init_list_cache();
......@@ -289,14 +291,15 @@ while ($signal ne 'term') {
$log->syslog('notice', "Sympa %s reload config",
Sympa::Constants::VERSION);
_load();
$signal = 0;
$spool_incoming = Sympa::Spool::Incoming->new;
$signal = 0;
}
# If the spool was empty, sleep for a while.
unless ($numprocessed) {
sleep $Conf::Conf{'sleep'};
}
} #end of block while ($signal ne 'term'){
} # while ($signal ne 'term')
$log->syslog('notice', 'Sympa/automatic exited normally due to signal');
Sympa::Tools::Daemon::remove_pid($pidfile, $PID);
......@@ -769,11 +772,7 @@ sub process_message {
# do not process messages in list creation only mode, move them to
# main spool
my $marshalled = Sympa::Spool::store_spool(
$Conf::Conf{'queue'}, $message, '%s@%s.%ld.%ld,%d',
[qw(localpart domainpart date PID RAND)],
original => 1
);
my $marshalled = $spool_incoming->store($message, original => 1);
if ($marshalled) {
$log->syslog('notice',
'Message %s is stored into incoming spool as <%s>',
......
......@@ -49,6 +49,7 @@ use Sympa::Regexps;
use Sympa::Report;
use Sympa::Scenario;
use Sympa::Spool;
use Sympa::Spool::Incoming;
use Sympa::Tools::Daemon;
use Sympa::Tools::Data;
use Sympa::Tools::File;
......@@ -268,27 +269,13 @@ my $index_cleanqueue = 0;
# This is the main loop : look for files in the directory, handles
# them, sleeps a while and continues the good job.
my $spool = $Conf::Conf{'queue'};
my $spool = Sympa::Spool::Incoming->new;
while ($signal ne 'term') {
Sympa::List::init_list_cache();
# Process grouped notifications
Sympa::Alarm->instance->flush;
my $dh;
unless (opendir $dh, $spool) {
die sprintf 'Can\'t open dir %s: %s', $spool, $ERRNO;
# No return.
}
my @qfile =
grep { !/,lock/ and !/\A(?:\.|T\.|BAD-)/ and -f ($spool . '/' . $_) }
readdir $dh;
closedir $dh;
my %mtime =
map { ($_ => Sympa::Tools::File::get_mtime($spool . '/' . $_)) }
@qfile;
@qfile = sort { $mtime{$a} <=> $mtime{$b} } @qfile;
# process digest only in distribution mode.
# Scan queuedigest.
if ( $daemon_usage eq 'DAEMON_MASTER'
......@@ -304,119 +291,65 @@ while ($signal ne 'term') {
$latest_msgid_table_cleanup = time;
}
my $highest_priority = 'z';
my $numprocessed = 0;
foreach my $filename (@qfile) {
last if $signal;
$language->set_lang($default_lang);
$mailer->reaper(); # finish terminated process
my $lock_fh =
Sympa::LockedFile->new($spool . '/' . $filename, -1, '+<');
next unless $lock_fh;