Bulk.pm 14.9 KB
Newer Older
1
# -*- indent-tabs-mode: nil; -*-
2
# vim:ft=perl:et:sw=4
3
4
# $Id$

5
# Sympa - SYsteme de Multi-Postage Automatique
6
7
8
9
#
# Copyright (c) 1997, 1998, 1999 Institut Pasteur & Christophe Wolfhugel
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
10
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
11
12
13
# Copyright 2017 The Sympa Community. See the AUTHORS.md file at the top-level
# directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
14
15
16
17
18
19
20
21
22
23
24
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
25
26
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
27

28
package Sympa::Bulk;
29
30

use strict;
31
use warnings;
32
use Cwd qw();
33
use English qw(-no_match_vars);
34
use File::Copy qw();
sikeda's avatar
sikeda committed
35
use Time::HiRes qw();
36

37
use Conf;
38
use Sympa::Constants;
39
use Sympa::LockedFile;
40
use Sympa::Log;
41
use Sympa::Message;
42
use Sympa::Spool;
43
use Sympa::Tools::File;
44

45
46
my $log = Sympa::Log->instance;

47
48
49
sub new {
    my $class = shift;

50
    my $self = bless {
51
52
53
54
55
56
        msg_directory     => $Conf::Conf{'queuebulk'} . '/msg',
        pct_directory     => $Conf::Conf{'queuebulk'} . '/pct',
        bad_directory     => $Conf::Conf{'queuebulk'} . '/bad',
        bad_msg_directory => $Conf::Conf{'queuebulk'} . '/bad/msg',
        bad_pct_directory => $Conf::Conf{'queuebulk'} . '/bad/pct',
        _metadatas        => undef,
57
    } => $class;
58
59
60
61
62
63
64
65
66
67
68

    $self->_create_spool;

    return $self;
}

sub _create_spool {
    my $self = shift;

    my $umask = umask oct $Conf::Conf{'umask'};
    foreach my $directory (
69
70
71
        $Conf::Conf{queuebulk},     $self->{msg_directory},
        $self->{pct_directory},     $self->{bad_directory},
        $self->{bad_msg_directory}, $self->{bad_pct_directory}
72
73
        ) {
        unless (-d $directory) {
74
            $log->syslog('info', 'Creating spool %s', $directory);
75
76
77
78
79
80
81
82
83
84
85
86
87
            unless (
                mkdir($directory, 0755)
                and Sympa::Tools::File::set_file_rights(
                    file  => $directory,
                    user  => Sympa::Constants::USER(),
                    group => Sympa::Constants::GROUP()
                )
                ) {
                die sprintf 'Cannot create %s: %s', $directory, $ERRNO;
            }
        }
    }
    umask $umask;
88
}
89

90
sub next {
91
    my $self = shift;
92

93
    unless ($self->{_metadatas}) {
94
        my $cwd = Cwd::getcwd();
95
96
97
        unless (chdir $self->{pct_directory}) {
            die sprintf 'Cannot chdir to %s: %s', $self->{pct_directory},
                $ERRNO;
98
        }
99
        $self->{_metadatas} = [
100
101
            sort grep {
                        !/,lock/
102
                    and !m{(?:\A|/)(?:\.|T\.|BAD-)}
103
                    and -f ($self->{pct_directory} . '/' . $_)
104
            } glob '*/*'
105
        ];
106
        chdir $cwd;
sikeda's avatar
sikeda committed
107
    }
108
109
    unless (@{$self->{_metadatas}}) {
        undef $self->{_metadatas};
110
        return;
111
112
    }

113
    while (my $marshalled = shift @{$self->{_metadatas}}) {
sikeda's avatar
sikeda committed
114
115
        my ($lock_fh, $metadata, $message);

116
        # Try locking packet.  Those locked or removed by other process will
117
        # be skipped.
118
119
        $lock_fh =
            Sympa::LockedFile->new($self->{pct_directory} . '/' . $marshalled,
120
            -1, '+<');
121
122
123
124
        next unless $lock_fh;

        # FIXME: The list or the robot that injected packet can no longer be
        # available.
125
        $metadata = Sympa::Spool::unmarshal_metadata(
126
            $self->{pct_directory},
127
            $marshalled,
128
129
130
            qr{\A(\w+)\.(\w+)\.(\d+)\.(\d+\.\d+)\.([^\s\@]*)\@([\w\.\-*]*)_(\w+),(\d+),(\d+)/(\w+)\z},
            [   qw(priority packet_priority date time localpart domainpart tag pid rand serial)
            ]
131
        );
132

133
        if ($metadata) {
134
135
136
            # Skip messages not yet to be delivered.
            next unless $metadata->{date} <= time;

137
            my $msg_file = Sympa::Spool::marshal_metadata(
138
139
140
141
142
                $metadata,
                '%s.%s.%d.%f.%s@%s_%s,%ld,%d',
                [   qw(priority packet_priority date time localpart domainpart tag pid rand)
                ]
            );
143
144
            $message = Sympa::Message->new_from_file(
                $self->{msg_directory} . '/' . $msg_file, %$metadata);
sikeda's avatar
sikeda committed
145

146
147
148
149
            if ($message) {
                my $rcpt_string = do { local $RS; <$lock_fh> };
                $message->{rcpt} = [split /\n+/, $rcpt_string];
            }
150
        }
151

152
153
154
155
        # Though message might not be deserialized, anyway return the result.
        return ($message, $lock_fh);
    }
    return;
156
157
}

158
sub quarantine {
159
    my $self    = shift;
160
    my $lock_fh = shift;
161

162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    my $marshalled        = $lock_fh->basename(1);
    my $bad_pct_directory = $self->{bad_pct_directory} . '/' . $marshalled;
    my $bad_msg_file      = $self->{bad_msg_directory} . '/' . $marshalled;
    my $bad_pct_file;

    File::Copy::cp($self->{msg_directory} . '/' . $marshalled, $bad_msg_file)
        unless -e $bad_msg_file;

    $bad_pct_file = $bad_pct_directory . '/' . $lock_fh->basename;
    mkdir $bad_pct_directory unless -d $bad_pct_directory;
    unless (-d $bad_pct_directory and $lock_fh->rename($bad_pct_file)) {
        $bad_pct_file =
              $self->{pct_directory} . '/BAD-'
            . $lock_fh->basename(1) . '-'
            . $lock_fh->basename;
        return undef unless $lock_fh->rename($bad_pct_file);
    }
179

180
181
182
183
184
    if (rmdir($self->{pct_directory} . '/' . $marshalled)) {
        # No more packet.
        unlink($self->{msg_directory} . '/' . $marshalled);
    }
    return 1;
185
}
186

sikeda's avatar
sikeda committed
187
sub remove {
188
    my $self    = shift;
sikeda's avatar
sikeda committed
189
190
    my $lock_fh = shift;

191
    my $marshalled = $lock_fh->basename(1);
sikeda's avatar
sikeda committed
192
193

    if ($lock_fh->unlink) {
194
195
196
        if (rmdir($self->{pct_directory} . '/' . $marshalled)) {
            # No more packet.
            unlink($self->{msg_directory} . '/' . $marshalled);
sikeda's avatar
sikeda committed
197
198
199
200
201
202
        }
        return 1;
    }
    return undef;
}

203
204
205
# DEPRECATED: No longer used.
#sub messageasstring($messagekey);

206
# fetch message from bulkspool_table by key
207
# Old name: Sympa::Bulk::message_from_spool()
208
209
# DEPRECATED: Not used.
#sub fetch_content($messagekey);
210

211
# DEPRECATED: Use Sympa::Message::personalize().
212
# sub merge_msg;
213

214
215
# DEPRECATED: Use Sympa::Message::personalize_text().
# sub merge_data ($rcpt, $listname, $robot_id, $data, $body, \$message_output)
216

217
sub store {
218
    my $self    = shift;
219
    my $message = shift->dup;
220
    my $rcpt    = shift;
221
    my %options = @_;
222

sikeda's avatar
sikeda committed
223
224
    delete $message->{rcpt};    #FIXME

225
    my ($list, $robot_id);
226
    if (ref($message->{context}) eq 'Sympa::List') {
227
        $list     = $message->{context};
228
        $robot_id = $message->{context}->{'domain'};
229
230
231
232
233
    } elsif ($message->{context} and $message->{context} ne '*') {
        $robot_id = $message->{context};
    } else {
        $robot_id = '*';
    }
234

235
236
237
238
239
    my $tag = $options{tag};
    $tag = 's' unless defined $tag;
    $message->{tag} = $tag;

    $message->{priority} =
240
241
242
          $list
        ? $list->{admin}{priority}
        : Conf::get_robot_conf($robot_id, 'sympa_priority')
243
244
        unless defined $message->{priority} and length $message->{priority};
    $message->{packet_priority} =
245
        Conf::get_robot_conf($robot_id, 'sympa_packet_priority');
246
247
    $message->{date} = time unless defined $message->{date};
    $message->{time} = Time::HiRes::time();
248

249
250
251
    # First, store the message in bulk/msg spool, because as soon as packets
    # are created bulk.pl may distribute them.

252
    my $marshalled = Sympa::Spool::store_spool(
253
        $self->{msg_directory},
254
255
256
257
258
259
260
261
        $message,
        '%s.%s.%d.%f.%s@%s_%s,%ld,%d',
        [   qw(priority packet_priority date time localpart domainpart tag PID RAND)
        ],
        %options
    );
    return unless $marshalled;

262
    unless (mkdir($self->{pct_directory} . '/' . $marshalled)) {
263
        $log->syslog(
264
265
266
267
268
            'err',
            'Cannot mkdir %s/%s: %m',
            $self->{pct_directory}, $marshalled
        );
        unlink($self->{msg_directory} . '/' . $marshalled);
269
270
271
272
273
        return;
    }

    # Second, create each recipient packet in bulk/pct spool.

274
275
276
277
    my @rcpts;
    unless (ref $rcpt) {
        @rcpts = ([$rcpt]);
    } else {
278
        @rcpts = _get_recipient_tabs_by_domain($robot_id, @{$rcpt || []});
279
    }
sikeda's avatar
sikeda committed
280
    my $total_sent = $#rcpts + 1;
281

282
283
284
285
286
287
288
    # Create a temporary lock file in the packet directory to prevent bulk.pl
    # from removing packet directory and the message during addition of
    # packets.
    my $lock_fh_tmp = Sympa::LockedFile->new(
        $self->{pct_directory} . '/' . $marshalled . '/dont_rmdir',
        -1, '+');

289
    my $serial = $message->{tag};
290
    while (my $rcpt = shift @rcpts) {
291
        my $lock_fh = Sympa::LockedFile->new(
292
            $self->{pct_directory} . '/' . $marshalled . '/' . $serial,
293
294
295
            5, '>>');
        return unless $lock_fh;

sikeda's avatar
sikeda committed
296
        $lock_fh_tmp->close unless @rcpts;   # Now the last packet is written.
297

298
299
300
        print $lock_fh join("\n", @{$rcpt}) . "\n";
        $lock_fh->close;

301
        if (length $serial == 1) {           # '0', 's' or 'z'.
302
303
304
305
            $serial = '0001';
        } else {
            $serial++;
        }
306
    }
307

308
    $log->syslog('notice', 'Message %s is stored into bulk spool as <%s>',
309
        $message, $marshalled);
310
    return unless $marshalled;
sikeda's avatar
sikeda committed
311
    return {marshalled => $marshalled, total_packets => $total_sent};
312
313
}

314
# Old name: (part of) Sympa::Mail::mail_message().
315
316
317
sub _get_recipient_tabs_by_domain {
    my $robot_id = shift;
    my @rcpt     = @_;
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338

    return unless @rcpt;

    my ($i, $j, $nrcpt);
    my $size = 0;

    my %rcpt_by_dom;

    my @sendto;
    my @sendtobypacket;

    while (defined($i = shift @rcpt)) {
        my @k = reverse split /[\.@]/, $i;
        my @l = reverse split /[\.@]/, (defined $j ? $j : '@');

        my $dom;
        if ($i =~ /\@(.*)$/) {
            $dom = $1;
            chomp $dom;
        }
        $rcpt_by_dom{$dom} += 1;
339
        $log->syslog(
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
            'debug2',
            'Domain: %s; rcpt by dom: %s; limit for this domain: %s',
            $dom,
            $rcpt_by_dom{$dom},
            $Conf::Conf{'nrcpt_by_domain'}{$dom}
        );

        if (
            # number of recipients by each domain
            (   defined $Conf::Conf{'nrcpt_by_domain'}{$dom}
                and $rcpt_by_dom{$dom} >= $Conf::Conf{'nrcpt_by_domain'}{$dom}
            )
            or
            # number of different domains
            (       $j
                and scalar(@sendto) > Conf::get_robot_conf($robot_id, 'avg')
                and lc "$k[0] $k[1]" ne lc "$l[0] $l[1]"
            )
            or
            # number of recipients in general
            (@sendto and $nrcpt >= Conf::get_robot_conf($robot_id, 'nrcpt'))
            ) {
            undef %rcpt_by_dom;
            # do not replace this line by "push @sendtobypacket, \@sendto" !!!
            my @tab = @sendto;
            push @sendtobypacket, \@tab;
            $nrcpt = $size = 0;
            @sendto = ();
        }

        $nrcpt++;
        $size += length($i) + 5;
        push(@sendto, $i);
        $j = $i;
    }

    if (@sendto) {
        my @tab = @sendto;
        # do not replace this line by push @sendtobypacket, \@sendto !!!
        push @sendtobypacket, \@tab;
    }

    return @sendtobypacket;
}

385
## remove file that are not referenced by any packet
386
387
# DEPRECATED: No longer used.
#sub purge_bulkspool();
388

sikeda's avatar
sikeda committed
389
390
# Old name: Bulk::there_is_too_much_remaining_packets().
sub too_much_remaining_packets {
391
392
    my $self = shift;

393
    my $remaining_packets = scalar @{$self->{_metadatas} || []};
394
395
396
397
398
    if ($remaining_packets > Conf::get_robot_conf('*', 'bulk_fork_threshold'))
    {
        return $remaining_packets;
    } else {
        return 0;
399
400
    }
}
401

402
1;
sikeda's avatar
sikeda committed
403
404
405
406
407
408
409
410
411
412
__END__

=encoding utf-8

=head1 NAME

Sympa::Bulk - Spool for bulk sending

=head1 SYNOPSIS

413
414
415
416
417
418
  use Sympa::Bulk;
  my $bulk = Sympa::Bulk->new;

  $bulk->store($message, ['user@dom.ain', 'user@other.dom.ain']);

  my ($message, $handle) = $bulk->next;
sikeda's avatar
sikeda committed
419
420
421
422
423

=head1 DESCRIPTION

L<Sympa::Bulk> implements the spool for bulk sending.

424
=head2 Methods
sikeda's avatar
sikeda committed
425
426
427

=over

428
429
430
431
432
=item new ( )

I<Constructor>.
Creates new instance of L<Sympa::Bulk>.

sikeda's avatar
sikeda committed
433
434
=item next ( )

435
I<Instance method>.
436
Gets next packet to process, order is controlled by message priority, then by
437
438
packet priority, then by delivery date, then by reception date.
Packets with future delivery date are ignored.
sikeda's avatar
sikeda committed
439
440
441
442
443
444
445
446
447
448
449
450
451
Packet will be locked to prevent multiple proccessing of a single packet.

Parameters:

None.

Returns:

Two-elements list of L<Sympa::Message> instance and filehandle locking
a packet.

=item quarantine ( $handle )

452
I<Instance method>.
sikeda's avatar
sikeda committed
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
Quarantines a packet.
Packet will be moved into bad/ subdirectory of the spool.

Parameter:

=over

=item $handle

Filehandle, L<Sympa::LockedFile> instance, locking packet.

=back

Returns:

True value if packet could be quarantined.
Otherwise false value.

=item remove ( $handle )

473
I<Instance method>.
sikeda's avatar
sikeda committed
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
Removes a packet.
If the packet is the last one of bulk sending,
corresponding message will also be removed from spool.

Parameter:

=over

=item $handle

Filehandle, L<Sympa::LockedFile> instance, locking packet.

=back

Returns:

True value if packet could be removed.
Otherwise false value.

=item store ( $message, $rcpt, [ original =E<gt> $original ],
[ tag =E<gt> $tag ] )

496
I<Instance method>.
sikeda's avatar
sikeda committed
497
Stores the message into message spool.
sikeda's avatar
sikeda committed
498
Recipients will be split into multiple packets and
sikeda's avatar
sikeda committed
499
500
501
502
503
504
505
506
stored into packet spool.

Parameters:

=over

=item $message

507
Message to be stored.  Following attributes and metadata are referred:
508
509

=over
sikeda's avatar
sikeda committed
510

511
512
513
514
515
516
517
518
519
520
=item {envelope_sender}

SMTP "MAIL FROM:" field.

=item {priority}

Message priority.

=item {packet_priority}

521
Packet priority, assigned as C<sympa_packet_priority> parameter by each robot.
522
523
524
525
526

=item {date}

Unix time when the message would be delivered.

527
528
529
530
=item {time}

Unix time in floating point number when the message was stored.

531
=back
sikeda's avatar
sikeda committed
532
533
534

=item $rcpt

535
Scalar, scalarref or arrayref, for SMTP "RCPT TO:" field(s).
sikeda's avatar
sikeda committed
536
537
538

=item original =E<gt> $original

539
If the message was decrypted, stores original encrypted form.
sikeda's avatar
sikeda committed
540
541
542

=item tag =E<gt> $tag

543
TBD.
sikeda's avatar
sikeda committed
544
545
546
547
548
549
550
551
552
553

=back

Returns:

If storing succeeded, marshalled metadata (file name) of the message.
Otherwise C<undef>.

=item too_much_remaining_packets ( )

554
I<Instance method>.
sikeda's avatar
sikeda committed
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
Returns true value if the number of remaining packets exceeds
the value of the C<bulk_fork_threshold> config parameter.

=back

=head1 SEE ALSO

L<bulk(8)>, L<Sympa::Mailer>, L<Sympa::Message>.

=head1 HISTORY

L<Bulk> module initially written by Serge Aumont appeared on Sympa 6.0.
It used database tables to store and fetch packets and messages.

Support for DKIM signing was added on Sympa 6.1.

Rewritten L<Sympa::Bulk> appeared on Sympa 6.2, using spools based on
filesystem.

=cut