include.pm 18.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- indent-tabs-mode: nil; -*-
# vim:ft=perl:et:sw=4
# $Id$

# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 201X The Sympa Community. See the AUTHORS.md file at the
# top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

package Sympa::Request::Handler::include;

use strict;
use warnings;

use Sympa;
use Sympa::DatabaseManager;
use Sympa::DataSource;
use Sympa::LockedFile;
use Sympa::Log;

use base qw(Sympa::Request::Handler);

my $log = Sympa::Log->instance;

use constant _action_scenario => undef;
use constant _context_class   => 'Sympa::List';

my %config_ca_map = (
    'include_ldap_ca'        => 'Sympa::DataSource::LDAP',
    'include_ldap_2level_ca' => 'Sympa::DataSource::LDAP2',
    'include_sql_ca'         => 'Sympa::DataSource::SQL',
);

my %config_user_map = (
    'include_file'              => 'Sympa::DataSource::File',
    'include_remote_file'       => 'Sympa::DataSource::RemoteFile',
    'include_list'              => 'Sympa::DataSource::List',      # Obsoleted
    'include_sympa_list'        => 'Sympa::DataSource::List',
    'include_remote_sympa_list' => 'Sympa::DataSource::RemoteDump',
    'include_ldap_query'        => 'Sympa::DataSource::LDAP',
    'include_ldap_2level_query' => 'Sympa::DataSource::LDAP2',
    'include_sql_query'         => 'Sympa::DataSource::SQL',
    'include_voot_group'        => 'Sympa::DataSource::VOOT',
);

sub _get_data_sources {
    my $list = shift;
    my $role = shift;

    my @dss;

    if ($role eq 'custom_attribute') {
        foreach my $ptype (sort keys %config_ca_map) {
            my @config = grep {$_} @{$list->{'admin'}{$ptype} || []};
            my $type = $config_ca_map{$ptype};
            push @dss, map {
                Sympa::DataSource->new($type, $role, context => $list, %$_)
            } @config;
        }
    } elsif ($role eq 'member') {
        my @config_files = map { $list->_load_include_admin_user_file($_) }
            @{$list->{'admin'}{'member_include'} || []};

        foreach my $ptype (sort keys %config_user_map) {
            my @config = grep {$_} (
                @{$list->{'admin'}{$ptype} || []},
                map { @{$_->{$ptype} || []} } @config_files
            );
            # Special case: include_file is not paragraph.
            if ($ptype eq 'include_file') {
                @config = map { {name => $_, path => $_} } @config;
            }
            my $type = $config_user_map{$ptype};
            push @dss, map {
                Sympa::DataSource->new($type, $role, context => $list, %$_)
            } @config;
        }
    } else {
        my $pname = ($role eq 'owner') ? 'owner_include' : 'editor_include';
        my @config_files = map { $list->_load_include_admin_user_file($_) }
            @{$list->{'admin'}{$pname} || []};

        foreach my $ptype (sort keys %config_user_map) {
            my @config = grep {$_}
                map { @{$_->{$ptype} || []} } @config_files;
            # Special case: include_file is not paragraph.
            if ($ptype eq 'include_file') {
                @config = map { {name => $_, path => $_} } @config;
            }
            my $type = $config_user_map{$ptype};
            push @dss, map {
                Sympa::DataSource->new($type, $role, context => $list, %$_)
            } @config;
        }
    }

    return [@dss];
}

sub _twist {
    my $self    = shift;
    my $request = shift;

    my $list = $request->{context};
    my $role = $request->{role};

    die 'bug in logic. Ask developer'
        unless grep { $role and $role eq $_ } qw(member owner editor);

    my $dss = _get_data_sources($list, $role);
    return 0 unless $dss and @$dss;

    # Get an Exclusive lock.
    my $lock_fh =
        Sympa::LockedFile->new($list->{'dir'} . '/include.' . $role, -1, '+');
    unless ($lock_fh) {
        $log->syslog('info', '%s: Locked, skip syncing', $list);
        $self->add_stash($request, 'notice', 'sync_include_skip',
            {listname => $list->{'name'}});
        return 0;
    }

    my $sdm = Sympa::DatabaseManager->instance;
IKEDA Soji's avatar
IKEDA Soji committed
138
    return undef unless $sdm;
139
    my $sth;
140
141
142
143
    my ($t, $r) =
          ($role eq 'member')
        ? ('subscriber', '')
        : ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
144
145
146

    # Start sync.
    my $sync_start = time;
147
    my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
148
149
150
151
152

    foreach my $ds (@{$dss || []}) {
        $lock_fh->extend;

        if ($ds->is_allowed_to_sync) {
153
154
            my %res = _sync_ds($ds, $sync_start);
            if (%res) {
155
156
                $log->syslog('info',
                    '%s: %d included, %d deleted, %d updated, %d kept',
157
                    $ds, @res{qw(added deleted updated kept)});
158
159
160
161
162
163
                $self->add_stash(
                    $request, 'notice',
                    'include',
                    {   listname => $list->{'name'},
                        id       => $ds->get_short_id,
                        name     => $ds->name,
164
                        result   => {%res}
165
166
                    }
                );
167
168
169
                foreach my $key (keys %res) {
                    $result{$key} += $res{$key} if exists $result{$key};
                }
170
171
172
173
174
175
176
177
178
                next;
            }
        }

        # Preserve users with failed or disallowed data sources:
        # Update update_date column of existing rows.
        my $id = $ds->get_short_id;

        unless (
IKEDA Soji's avatar
IKEDA Soji committed
179
180
181
182
183
184
185
186
            $sdm->do_prepared_query(
                qq{UPDATE ${t}_table
                   SET update_epoch_$t = ?
                   WHERE include_sources_$t LIKE ? AND
                         update_epoch_$t < ? AND
                         list_$t = ? AND robot_$t = ?$r},
                $sync_start,
                '%' . $id . '%',
187
188
189
190
191
                $sync_start,
                $list->{'name'}, $list->{'domain'}
            )
        ) {
            #FIXME: report error
IKEDA Soji's avatar
IKEDA Soji committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
            $self->add_stash($request, 'intern');
            return undef;    # Abort sync
        }
    }

    # Remove list users not updated anymore.
    $lock_fh->extend;
    unless (
        $sth = $sdm->do_prepared_query(
            qq{SELECT user_$t AS email
               FROM ${t}_table
               WHERE (subscribed_$t IS NULL OR subscribed_$t <> 1) AND
                     included_$t = 1 AND
                     update_epoch_$t < ? AND
                     list_$t = ? AND robot_$t = ?$r},
            $sync_start,
            $list->{'name'}, $list->{'domain'}
        )
    ) {
        #FIXME: report error
        $self->add_stash($request, 'intern');
        return undef;
    } else {
        my @emails = map { $_->[0] } @{$sth->fetchall_arrayref || []};
        $sth->finish;
217

IKEDA Soji's avatar
IKEDA Soji committed
218
219
        foreach my $email (@emails) {
            next unless defined $email and length $email;
220

IKEDA Soji's avatar
IKEDA Soji committed
221
            if ($role eq 'member') {
222
223
224
225
226
227
228
229
230
231
232
                $list->delete_list_member(users => [$email]);

                # Send notification if the list config authorizes it only.
                if ($list->{'admin'}{'inclusion_notification_feature'} eq
                    'on') {
                    unless (Sympa::send_file($list, 'removed', $email, {})) {
                        $log->syslog('err',
                            'Unable to send template "removed" to %s',
                            $email);
                    }
                }
IKEDA Soji's avatar
IKEDA Soji committed
233
234
            } else {
                $list->delete_list_admin($role, $email);
235
236
            }

IKEDA Soji's avatar
IKEDA Soji committed
237
            $result{deleted} += 1;
238
        }
IKEDA Soji's avatar
IKEDA Soji committed
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
    }

    unless (
        $sdm->do_prepared_query(
            qq{UPDATE ${t}_table
               SET included_$t = 0,
                   include_sources_$t = NULL
               WHERE subscribed_$t = 1 AND
                     included_$t = 1 AND
                     update_epoch_$t < ? AND
                     list_$t = ? AND robot_$t = ?$r},
            $sync_start,
            $list->{'name'}, $list->{'domain'}
        )
    ) {
        #FIXME: report error
    }
256

IKEDA Soji's avatar
IKEDA Soji committed
257
    if ($role eq 'member') {
258
        # Sync custom attributes.
259
260
261
262
263
264
265
266
267
268
        my $dss = _get_data_sources($list, 'custom_attribute');
        if ($dss and @$dss) {
            foreach my $ds (@{$dss || []}) {
                $lock_fh->extend;

                _sync_ds($ds) if $ds->is_allowed_to_sync;
            }
        }
    }

269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
    # Make data source IDs distinct.
    if ($sth = $sdm->do_prepared_query(
            qq{SELECT include_sources_$t AS "id"
               FROM ${t}_table
               WHERE include_sources_$t LIKE '%s,%s' AND
                     list_$t = ? AND robot_$t = ?$r},
            $list->{'name'}, $list->{'domain'}
        )
    ) {
        my %ids;
        my %all_ids = map { ($_->get_short_id => 1) } @{$dss || []};

        while (my $row = $sth->fetchrow_hashref('NAME_lc')) {
            my $id = $row->{id};
            next if not $id or exists $ids{$id};

            my %seen;
            my $new_id = join ',',
                reverse grep { !$seen{$_}++ and $all_ids{$_} }
                reverse split /\s*,\s*/, $id;
            $ids{$id} = $new_id unless $id eq $new_id;
        }
        $sth->finish;

        foreach my $id (keys %ids) {
            $sdm->do_prepared_query(
                qq{UPDATE ${t}_table
                   SET include_sources_$t = ?
                   WHERE include_sources_$t = ? AND
                         list_$t = ? AND robot_$t = ?$r},
                ($ids{$id} || undef), $id,
                $list->{'name'}, $list->{'domain'}
            );
        }
    }

305
306
307
308
309
310
    # Special treatment for Sympa::DataSource::List.
    _clean_inclusion_table($list, $role, $sync_start);

    # Release lock.
    $lock_fh->close;

311
312
313
314
315
316
    $log->syslog(
        'info',   '%s: %d included, %d deleted, %d updated',
        $request, @result{qw(added deleted updated)}
    );
    $self->add_stash($request, 'notice', 'include_performed',
        {listname => $list->{'name'}, result => {%result}});
317
318
319
320
    return 1;
}

sub _sync_ds {
321
322
    my $ds         = shift;
    my $sync_start = shift;
323
324
325
326
327
328
329
330
331

    return unless $ds->open;

    my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
    while (my $entry = $ds->next) {
        my ($email, $other_value) = @$entry;
        my %res =
            ($ds->role eq 'custom_attribute')
            ? _sync_ds_ca($ds, $email, $other_value)
332
333
334
335
336
337
338
339
340
            : _sync_ds_user($ds, $email, $other_value, $sync_start);

        unless (%res) {
            $ds->close;
            $log->syslog('info', '%s: Aborted sync', $ds);
            return;
        }
        foreach my $res (keys %res) {
            $result{$res} += $res{$res} if exists $result{$res};
341
342
343
344
        }
    }
    unless ($ds->role eq 'custom_attribute') {
        # Special treatment for Sympa::DataSource::List.
345
346
        _update_inclusion_table($ds, $sync_start)
            if ref $ds eq 'Sympa::DataSource::List';
347
348
349
350
351
352
353
354
    }

    $ds->close;

    return %result;
}

# Internal function.
355
sub _sync_ds_ca {
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
    my $ds        = shift;
    my $email     = shift;
    my $ca_update = shift;

    my $list   = $ds->{context};
    my $member = $list->get_list_member($email);
    return unless $member;
    my $ca = $member->{custom_attribute} || {};

    my $changed;
    foreach my $key (sort keys %{$ca_update || {}}) {
        my $cur = $ca->{$key};
        $cur = '' unless defined $cur;
        my $new = $ca_update->{$key};
        $new = '' unless defined $new;
        next if $cur eq $new;

        $ca->{$key} = $new;
        $changed = 1;
    }
    return (kept => 1) unless $changed;

    $list->update_list_member($email, custom_attribute => $ca_update);

    return (updated => 1);
}

# Internal function.
sub _sync_ds_user {
385
386
387
388
389
390
391
    my $ds         = shift;
    my $email      = shift;
    my $gecos      = shift;
    my $sync_start = shift;

    return (none => 0) unless Sympa::Tools::Text::valid_email($email);
    $email = Sympa::Tools::Text::canonic_email($email);
392
393
394

    my $list = $ds->{context};
    my $role = $ds->role;
395
    my $id   = $ds->get_short_id;
396
397
398

    my $time = time;
    # Avoid retrace of clock e.g. by outage of NTP server.
399
    $time = $sync_start unless $sync_start <= time;
400

401
    my $sdm = Sympa::DatabaseManager->instance;
IKEDA Soji's avatar
IKEDA Soji committed
402
    return undef unless $sdm;
403
404
405
406
407
408
409
410
411
412
413
414
415
    my $sth;
    my ($t, $r) =
          ($role eq 'member')
        ? ('subscriber', '')
        : ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));

    # 1. If role of the data source is 'member' and the user is excluded:
    #    Do nothing.
    return (none => 0)
        if $role eq 'member' and $list->is_member_excluded($email);

    # 2. If user not subscribed and already updated by the other data sources:
    #    UPDATE included=1, id.
IKEDA Soji's avatar
IKEDA Soji committed
416
    return unless $sth = $sdm->do_prepared_query(
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
        qq{UPDATE ${t}_table
           SET included_$t = 1,
               include_sources_$t = concat_ws(',', include_sources_$t, ?)
           WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
                 (subscribed_$t IS NULL OR subscribed_$t <> 1) AND
                 ? <= update_epoch_$t},
        $id,
        $email, $list->{'name'}, $list->{'domain'},
        $sync_start
    );
    return (kept => 1) if $sth->rows;

    # 3. If
    #    (a) user is subscribed, or
    #    (b) not subscribed and not yet updated by the other data sources:
    #    UPDATE included=1, id, update_date
IKEDA Soji's avatar
IKEDA Soji committed
433
    return unless $sth = $sdm->do_prepared_query(
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
        qq{UPDATE ${t}_table
           SET included_$t = 1,
               include_sources_$t = concat_ws(',', include_sources_$t, ?),
               update_epoch_$t = ?
           WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
                 (
                  subscribed_$t = 1 OR
                  ((subscribed_$t IS NULL OR subscribed_$t <> 1) AND
                   update_epoch_$t < ?)
                 )},
        $id,
        $time,
        $email, $list->{'name'}, $list->{'domain'},
        $sync_start
    );
    return (updated => 1) if $sth->rows;

    # 4. Otherwise, i.e. a new user:
    #    INSERT new user with:
    #    email, gecos, subscribed=0, included=1, id, default attributes,
    #    update_date.
    my $user = {
        email       => $email,
457
        gecos       => $gecos,
458
459
460
        subscribed  => 0,
        included    => 1,
        id          => $id,
461
        update_date => $time,
462
    };
463
464
    my @defkeys = @{$ds->{_defkeys} || []};
    my @defvals = @{$ds->{_defvals} || []};
465
    @{$user}{@defkeys} = @defvals if @defkeys;
466
467

    if ($role eq 'member') {
468
        $list->add_list_member($user);
469

470
471
472
473
474
        # Send notification if the list config authorizes it only.
        if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
            unless ($list->send_probe_to_user('welcome', $email)) {
                $log->syslog('err',
                    'Unable to send "welcome" probe to %s', $email);
475
476
477
            }
        }
    } else {
478
        $list->add_list_admin($role, $user);
479
    }
480
    return (added => 1);
481
482
483
484
485
486
}

# Update inclusion_table: This feature was added on 6.2.16.
# Related only to Sympa::DataSource::List class.
# Old name: (part of) Sympa::List::_update_inclusion_table().
sub _update_inclusion_table {
487
488
    my $ds         = shift;
    my $sync_start = shift;
489
490
491
492
493
494

    my $list   = $ds->{context};
    my $role   = $ds->role;
    my $inlist = Sympa::List->new($ds->{listname});

    my $time = time;
495
496
    # Avoid retrace of clock e.g. by outage of NTP server.
    $time = $sync_start unless $sync_start <= $time;
497
498

    my $sdm = Sympa::DatabaseManager->instance;
IKEDA Soji's avatar
IKEDA Soji committed
499
    return undef unless $sdm;
500
501
502
    my $sth;

    unless (
IKEDA Soji's avatar
IKEDA Soji committed
503
        $sth = $sdm->do_prepared_query(
504
            q{UPDATE inclusion_table
IKEDA Soji's avatar
IKEDA Soji committed
505
506
507
508
509
510
              SET update_epoch_inclusion = ?
              WHERE target_inclusion = ? AND
                    role_inclusion = ? AND
                    source_inclusion = ? AND
                    (update_epoch_inclusion IS NULL OR
                     update_epoch_inclusion < ?)},
511
512
513
            $time, $list->get_id, $role, $inlist->get_id, $time
        )
        and $sth->rows
IKEDA Soji's avatar
IKEDA Soji committed
514
        or $sth = $sdm->do_prepared_query(
515
            q{INSERT INTO inclusion_table
IKEDA Soji's avatar
IKEDA Soji committed
516
517
518
              (target_inclusion, role_inclusion, source_inclusion,
               update_epoch_inclusion)
              VALUES (?, ?, ?, ?)},
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
            $list->get_id, $role, $inlist->get_id, $time
        )
        and $sth->rows
    ) {
        $log->syslog('err', 'Unable to update list %s in database', $list);
        return undef;
    }

    return 1;
}

# Old name: (part of) Sympa::List::_update_inclusion_table().
# Related only to Sympa::DataSource::List class.
sub _clean_inclusion_table {
    my $list       = shift;
    my $role       = shift;
    my $sync_start = shift;

    my $sdm = Sympa::DatabaseManager->instance;
    $sdm and $sdm->do_prepared_query(
        q{DELETE FROM inclusion_table
          WHERE target_inclusion = ? AND role_inclusion = ? AND
                update_epoch_inclusion < ?},
        $list->get_id, $role, $sync_start
    );
}

# Enforce uniqueness in a comma separated list of user source ID's.
# Old name: (part of) Sympa::List::add_source_id().
548
549
# No longer used.
#sub _add_source_id;
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592

# Returns a real unique ID for an include datasource.
sub get_id {
    shift->{context};
}

1;
__END__

=encoding utf-8

=head1 NAME

Sympa::Request::Hander::include - include request handler

=head1 DESCRIPTION

Includes users from data sources to a list.

Opens data sources, synchronizes list users with each of them and closes.
TBD.

=head1 SEE ALSO

L<Sympa::DataSource>, L<Sympa::List>.

L<"admin_table"|sympa_database(5)/"admin_table">,
L<"exclusion_table"|sympa_database(5)/"exclusion_table">,
L<"inclusion_table"|sympa_database(5)/"inclusion_table"> and
L<"subscriber_table"|sympa_database(5)/"subscriber_table">
in L<sympa_database(5)>.

=head1 HISTORY

The feature to include subscribers from data sources was introduced on
Sympa 3.3.6b.4.
Inclusion of owners and moderators was introduced on Sympa 4.2b.5.

L<Datasource> module appeared on Sympa 5.3a.9.
Entirely rewritten and renamed L<Sympa::DataSource> module and
L<Sympa::Request::Hander::include> module appeared on Sympa 6.2.XX.

=cut