Commit 301f6dd1 authored by sikeda's avatar sikeda
Browse files

[dev] Refactoring. Introducing Sympa::Spool::Digest and...

[dev] Refactoring.  Introducing Sympa::Spool::Digest and Sympa::Spool::Digest::Collection for digest spools.

ToDo: Careful tests.


git-svn-id: https://subversion.renater.fr/sympa/branches/sympa-6.2-branch@12363 05aa8bb8-cd2b-0410-b1d7-8918dfa770ce
parent 03731167
......@@ -800,8 +800,8 @@ sub checkfiles {
}
foreach my $qdir (
'spool', 'queuedigest', 'queuemod', 'queuetopic',
'queueauth', 'queuesubscribe', 'queuetask', 'tmpdir'
'spool', 'queuemod', 'queuetopic', 'queueauth',
'queuesubscribe', 'queuetask', 'tmpdir'
) {
unless (-d $Conf{$qdir}) {
$log->syslog('info', 'Creating spool %s', $Conf{$qdir});
......
......@@ -79,6 +79,8 @@ nobase_modules_DATA = \
Sympa/Spool/Archive.pm \
Sympa/Spool/Automatic.pm \
Sympa/Spool/Bounce.pm \
Sympa/Spool/Digest.pm \
Sympa/Spool/Digest/Collection.pm \
Sympa/Spool/Incoming.pm \
Sympa/Task.pm \
Sympa/Template.pm \
......
......@@ -65,6 +65,7 @@ use Sympa::Scenario;
use SDM;
use Sympa::Spool;
use Sympa::Spool::Archive;
use Sympa::Spool::Digest;
use Sympa::Task;
use Sympa::Template;
use tools;
......@@ -1743,7 +1744,8 @@ sub distribute_msg {
'smime_crypted'
)
) {
$self->store_digest($message);
my $spool_digest = Sympa::Spool::Digest->new(context => $self);
$spool_digest->store($message) if $spool_digest;
}
## Synchronize list members, required if list uses include sources
......@@ -2105,60 +2107,54 @@ sub _mail_message {
# | 0 if no subscriber for sending digest
# | undef
####################################################
# Old name: send_msg_digest().
# Old name: List::send_msg_digest().
# Note: This would be moved to Pipeline package.
sub distribute_digest {
$log->syslog('debug2', '(%s, ...)', @_);
my $self = shift;
my %options = @_;
my $collection = shift;
my $spool = shift;
my $spool_handle = shift;
my %options = @_;
my $spool = $Conf::Conf{'queuedigest'} . '/' . $self->get_id;
my $list = $spool->{context};
my $available_recipients = $self->get_digest_recipients_per_mode;
my $available_recipients = $list->get_digest_recipients_per_mode;
unless ($available_recipients) {
$log->syslog('info', 'No subscriber for sending digest in list %s',
$self);
$list);
unless ($options{keep_digest}) {
# Locking directory to remove it exclusively.
my $lock_fh_dir = Sympa::LockedFile->new($spool, -1, '+');
return 0 unless $lock_fh_dir;
Sympa::Tools::File::remove_dir($spool); # even if it is NOT empty.
# Releasing lock.
$lock_fh_dir->close;
while (1) {
my ($message, $handle) = $spool->next;
if ($message and $handle) {
$spool->remove($handle);
} elsif ($handle) {
$log->syslog('err', 'Cannot parse message <%s>',
$handle->basename);
$spool->quarantine($handle);
} else {
last;
}
}
}
return 0;
}
my $dh;
unless (opendir $dh, $spool) {
return undef;
}
my @qfile = sort
grep { !/,lock/ and !/\A(?:\.|T\.|BAD-)/ and -f ($spool . '/' . $_) }
readdir $dh;
closedir $dh;
my $time = time;
# Digest index.
my @all_msg;
my $i = 0;
foreach my $filename (@qfile) {
my $lock_fh =
Sympa::LockedFile->new($spool . '/' . $filename, -1, '+<');
next unless $lock_fh;
my $metadata =
Sympa::Spool::unmarshal_metadata($spool, $filename,
qr{\A(\d+)\.(\d+\.\d+)(?:,.*)?\z},
[qw(date time)]);
next unless $metadata;
my $msg_string = do { local $RS; <$lock_fh> };
my $message =
Sympa::Message->new($msg_string, %$metadata, context => $self);
next unless $message;
while (1) {
my ($message, $handle) = $spool->next;
last unless $handle; # No more messages.
unless ($message) {
$log->syslog('err', 'Cannot parse message <%s>',
$handle->basename);
$spool->quarantine($handle);
next;
}
$i++;
......@@ -2178,28 +2174,21 @@ sub distribute_digest {
};
push @all_msg, $msg;
$lock_fh->unlink unless $options{keep_digest};
# Locking directory to remove it exclusively.
my $lock_fh_dir = Sympa::LockedFile->new($spool, -1, '+');
next unless $lock_fh_dir;
rmdir $spool; # if it is empty.
# Releasing lock.
$lock_fh_dir->close;
$spool->remove($handle) unless $options{keep_digest};
}
my $param = {
'replyto' => $self->get_list_address('owner'),
'to' => $self->get_list_address(),
'replyto' => $list->get_list_address('owner'),
'to' => $list->get_list_address(),
'boundary1' => '----------=_'
. tools::get_message_id($self->{'domain'}),
. tools::get_message_id($list->{'domain'}),
'boundary2' => '----------=_'
. tools::get_message_id($self->{'domain'}),
. tools::get_message_id($list->{'domain'}),
};
# Compat. to 6.2a or earlier
$param->{'table_of_content'} = $language->gettext("Table of contents:");
if ($self->get_reply_to() =~ /^list$/io) {
if ($list->get_reply_to() =~ /^list$/io) {
$param->{'replyto'} = "$param->{'to'}";
}
......@@ -2211,7 +2200,7 @@ sub distribute_digest {
## Split messages into groups of digest_max_size size
my @group_of_msg;
while (@all_msg) {
my @group = splice @all_msg, 0, $self->{'admin'}{'digest_max_size'};
my @group = splice @all_msg, 0, $list->{'admin'}{'digest_max_size'};
push @group_of_msg, \@group;
}
......@@ -2230,20 +2219,20 @@ sub distribute_digest {
next unless exists $available_recipients->{$mode};
my $digest_message =
Sympa::Message->new_from_template($self, $mode,
Sympa::Message->new_from_template($list, $mode,
$available_recipients->{$mode}, $param);
if ($digest_message) {
# Add RFC 2919 header field
$self->add_list_header($digest_message, 'id');
$list->add_list_header($digest_message, 'id');
# Add RFC 2369 header fields
foreach my $field (
@{ tools::get_list_params($self->{'domain'})
@{ tools::get_list_params($list->{'domain'})
->{'rfc2369_header_fields'}->{'format'}
}
) {
if (scalar grep { $_ eq $field }
@{$self->{'admin'}{'rfc2369_header_fields'}}) {
$self->add_list_header($digest_message, $field);
@{$list->{'admin'}{'rfc2369_header_fields'}}) {
$list->add_list_header($digest_message, $field);
}
}
}
......@@ -2252,20 +2241,20 @@ sub distribute_digest {
$available_recipients->{$mode})) {
$log->syslog('notice',
'Unable to send template "%s" to %s list subscribers',
$mode, $self);
$mode, $list);
next;
}
# Add number and size of digests sent to total in stats file.
my $numsent = scalar @{$available_recipients->{$mode}};
my $bytes = length $digest_message->as_string;
$self->{'stats'}[1] += $numsent;
$self->{'stats'}[2] += $bytes;
$self->{'stats'}[3] += $bytes * $numsent;
$list->{'stats'}[1] += $numsent;
$list->{'stats'}[2] += $bytes;
$list->{'stats'}[3] += $bytes * $numsent;
}
}
$self->savestats();
$list->savestats();
return 1;
}
......@@ -5699,22 +5688,24 @@ sub is_archiving_enabled {
}
## Returns 1 if the digest must be sent.
sub get_nextdigest {
# Old name: Sympa::List::get_nextdigest().
# Note: this would be moved to Pipeline package.
sub may_distribute_digest {
$log->syslog('debug3', '(%s)', @_);
my $self = shift;
my $spool = shift;
my $spool = $Conf::Conf{'queuedigest'} . '/' . $self->get_id;
return undef unless -d $spool;
my $list = $spool->{context};
return undef unless $self->is_digest;
return undef unless defined $spool->{time};
return undef unless $list->is_digest;
my @days = @{$self->{'admin'}{'digest'}->{'days'} || []};
my $hh = $self->{'admin'}{'digest'}->{'hour'} || 0;
my $mm = $self->{'admin'}{'digest'}->{'minute'} || 0;
my @days = @{$list->{'admin'}{'digest'}->{'days'} || []};
my $hh = $list->{'admin'}{'digest'}->{'hour'} || 0;
my $mm = $list->{'admin'}{'digest'}->{'minute'} || 0;
my @now = localtime time;
my $today = $now[6]; # current day
my @timedigest = localtime Sympa::Tools::File::get_mtime($spool);
my @now = localtime time;
my $today = $now[6]; # current day
my @timedigest = localtime $spool->{time};
## Should we send a digest today
my $send_digest = 0;
......@@ -5726,15 +5717,9 @@ sub get_nextdigest {
}
return undef unless $send_digest;
if (($now[2] * 60 + $now[1]) >= ($hh * 60 + $mm)
and (
Time::Local::timelocal(0, $mm, $hh, $now[3], $now[4], $now[5]) >
Time::Local::timelocal(
0, $timedigest[1], $timedigest[2],
$timedigest[3], $timedigest[4], $timedigest[5]
)
)
) {
if ($hh * 60 + $mm <= $now[2] * 60 + $now[1]
and Time::Local::timelocal(0, @timedigest[1 .. 5]) <
Time::Local::timelocal(0, $mm, $hh, @now[3 .. 5])) {
return 1;
}
......@@ -8783,37 +8768,8 @@ sub _save_list_members_file {
## Does the real job : stores the message given as an argument into
## the digest of the list.
sub store_digest {
$log->syslog('debug3', '(%s, %s)', @_);
my $self = shift;
my $message = shift->dup;
# Delete original message ID because it can be anonymized.
delete $message->{message_id};
unless (-d $Conf::Conf{'queuedigest'}) {
return unless mkdir $Conf::Conf{'queuedigest'};
}
my $spool = $Conf::Conf{'queuedigest'} . '/' . $self->get_id;
# Locking directory to prevent removal.
my $lock_fh_dir = Sympa::LockedFile->new($spool, 5, '+');
return unless $lock_fh_dir;
unless (-d $spool) {
return unless mkdir $spool;
}
my $oldtime = Sympa::Tools::File::get_mtime($spool);
my $marshalled =
Sympa::Spool::store_spool($spool, $message, '%ld.%f,%ld,%d',
[qw(date TIME PID RAND)]);
utime $oldtime, $oldtime, $spool;
# Releasing lock.
$lock_fh_dir->close;
return $marshalled;
}
# Moved to Sympa::Spool::Digest::store().
#sub store_digest;
=over 4
......@@ -11161,11 +11117,14 @@ sub purge {
$self->close_list();
if ($self->{'name'}) {
#FIXME: Lock directories to remove them safely.
my $archive = Sympa::Archive->new($self);
my $digest = Sympa::Spool::Digest->new(context => $self);
my $tracking = Sympa::Tracking->new($self);
my $error;
File::Path::remove_tree($archive->{base_directory},
{error => \$error});
File::Path::remove_tree($digest->{directory}, {error => \$error});
File::Path::remove_tree($tracking->{directory}, {error => \$error});
}
......
......@@ -44,15 +44,19 @@ my $log = Sympa::Log->instance;
# Methods.
sub new {
my $class = shift;
my $class = shift;
my %options = @_;
die $EVAL_ERROR unless eval sprintf 'require %s', $class->_generator;
my $self =
bless {%{$class->_directories}, _metadatas => undef,} => $class;
my $self = bless {
%options,
%{$class->_directories(%options) || {}},
_metadatas => undef,
} => $class;
$self->_create;
$self->_init;
$self->_init(0) or return undef;
$self;
}
......@@ -65,9 +69,11 @@ sub _create {
unless (-d $directory) {
$log->syslog('info', 'Creating directory %s of %s',
$directory, $self);
unless (mkdir $directory, 0775 or -d $directory) {
die sprintf 'Cannot create %s: %s', $directory, $ERRNO;
}
unless (
mkdir($directory, 0775)
and Sympa::Tools::File::set_file_rights(
Sympa::Tools::File::set_file_rights(
file => $directory,
user => Sympa::Constants::USER(),
group => Sympa::Constants::GROUP()
......@@ -92,7 +98,7 @@ sub next {
}
unless ($self->{_metadatas} and @{$self->{_metadatas}}) {
undef $self->{_metadatas};
$self->_init;
$self->_init(1);
return;
}
......@@ -103,7 +109,7 @@ sub next {
# be skipped.
$handle =
Sympa::LockedFile->new($self->{directory} . '/' . $marshalled,
-1, '+<');
-1, $self->_is_collection ? '+' : '+<');
next unless $handle;
$metadata = Sympa::Spool::unmarshal_metadata(
......@@ -114,8 +120,12 @@ sub next {
if ($metadata) {
next unless $self->_filter($metadata);
my $msg_string = do { local $RS; <$handle> };
$message = $self->_generator->new($msg_string, %$metadata);
if ($self->_is_collection) {
$message = $self->_generator->new(%$metadata);
} else {
my $msg_string = do { local $RS; <$handle> };
$message = $self->_generator->new($msg_string, %$metadata);
}
}
# Though message might not be deserialized, anyway return the result.
......@@ -131,11 +141,14 @@ sub _load {
unless (opendir $dh, $self->{directory}) {
die sprintf 'Cannot open dir %s: %s', $self->{directory}, $ERRNO;
}
my $iscol = $self->_is_collection;
my $metadatas = [
sort grep {
!/,lock/
and !m{(?:\A|/)(?:\.|T\.|BAD-)}
and -f ($self->{directory} . '/' . $_)
and ((not $iscol and -f ($self->{directory} . '/' . $_))
or ($iscol and -d ($self->{directory} . '/' . $_)))
} readdir $dh
];
closedir $dh;
......@@ -145,6 +158,8 @@ sub _load {
sub _filter {1}
sub _is_collection {0}
sub quarantine {
my $self = shift;
my $handle = shift;
......@@ -166,7 +181,13 @@ sub remove {
my $self = shift;
my $handle = shift;
return $handle->unlink;
if ($self->_is_collection) {
return undef
unless rmdir($self->{directory} . '/' . $handle->basename);
return $handle->close;
} else {
return $handle->unlink;
}
}
sub store {
......@@ -174,6 +195,8 @@ sub store {
my $message = shift->dup;
my %options = @_;
return if $self->_is_collection;
$message->{date} = time unless defined $message->{date};
my $marshalled =
......@@ -244,8 +267,8 @@ sub split_listname {
# Old name: SympaspoolClassic::analyze_file_name().
sub unmarshal_metadata {
$log->syslog('debug3', '(%s, %s, %s)', @_);
my $spool_dir = shift;
my $marshalled = shift;
my $spool_dir = shift;
my $marshalled = shift;
my $marshal_regexp = shift;
my $marshal_keys = shift;
......@@ -309,7 +332,7 @@ sub unmarshal_metadata {
}
sub marshal_metadata {
my $message = shift;
my $message = shift;
my $marshal_format = shift;
my $marshal_keys = shift;
......@@ -357,11 +380,11 @@ sub marshal_metadata {
}
sub store_spool {
my $spool_dir = shift;
my $message = shift;
my $spool_dir = shift;
my $message = shift;
my $marshal_format = shift;
my $marshal_keys = shift;
my %options = @_;
my %options = @_;
# At first content is stored into temporary file that has unique name and
# is referred only by this function.
......@@ -618,7 +641,7 @@ I<Instance method>, I<overridable>.
Creates spool.
By default, creates directories returned by _directories().
=item _directories ( )
=item _directories ( [ options, ... ] )
I<Class or instance method>, I<mandatory for filesystem spool>.
Returns hashref with directory paths related to the spool as values.
......@@ -635,13 +658,25 @@ By default, always returns true value.
I<Class or instance method>, I<mandatory>.
Returns name of the class to serialize and deserialize messages in the spool.
The class must implement methods dup(), new() and to_string().
If spool subclass is the collection (see _is_collection),
generator class must implement new().
Otherwise,
generator class must implement dup(), new() and to_string().
=item _init ( )
=item _init ( $state )
I<Instance method>.
Additional processing when _load() returns no contents or
when the spool class is instantiated.
Additional processing when _load() returns no contents ($state is 1) or
when the spool class is instantiated ($state is 0).
=item _is_collection ( )
I<Instance method>, I<overridable>.
If the class is collection of spool class, returns true value.
By default returns false value.
Collection class does not have store() method.
Its content is the set of spool instances.
=item _load ( )
......
# -*- indent-tabs-mode: nil; -*-
# vim:ft=perl:et:sw=4
# $Id$
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright (c) 1997, 1998, 1999 Institut Pasteur & Christophe Wolfhugel
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015 GIP RENATER
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
package Sympa::Spool::Digest;
use strict;
use warnings;
use English qw(-no_match_vars);
use Conf;
use Sympa::Tools::File;
use base qw(Sympa::Spool);
sub _directories {
my $self = shift;
my %options = @_;
my $list = ref($self) ? $self->{context} : $options{context};
die 'bug in logic. Ask developer' unless ref $list eq 'Sympa::List';
return {
parent_directory => $Conf::Conf{'queuedigest'},
directory => $Conf::Conf{'queuedigest'} . '/' . $list->get_id,
bad_directory => $Conf::Conf{'queuedigest'} . '/'
. $list->get_id . '/bad',
};
}
use constant _generator => 'Sympa::Message';
sub _init {
my $self = shift;
my $status = shift;
unless ($status) {
# Get earliest time of messages in the spool.
my $metadatas = $self->_load;
unless ($metadatas and @$metadatas) {
$self->{time} = undef;
} else {
$self->{time} = Sympa::Tools::File::get_mtime(
$self->{directory} . '/' . $metadatas->[0]);
}
}
return 1;
}
use constant _marshal_format => '%ld.%f,%ld,%d';
use constant _marshal_keys => [qw(date TIME PID RAND)];
use constant _marshal_regexp => qr{\A(\d+)\.(\d+\.\d+)(?:,.*)?\z};
sub next {
my $self = shift;
my ($message, $handle) = $self->SUPER::next();
if ($message) {
# Assign context which is not given by metadata.
$message->{context} = $self->{context};
}
return ($message, $handle);
}
# Old name: Sympa::List::store_digest().
sub store {
my $self = shift;
my $message = shift->dup;
# Delete original message ID because it can be anonymized.
delete $message->{message_id};
return $self->SUPER::store($message);
}
sub get_id {
my $self = shift;
if ($self->{context}) {
if (ref $self->{context} eq 'Sympa::List') {
return $self->{context}->get_id;
} else {
return $self->{context};
}
} else {
return '';
}
}
1;
__END__
=encoding utf-8
=head1 NAME
Sympa::Spool::Digest - Spool for messages waiting for digest sending
=head1 SYNOPSIS