Commit bf7661f2 authored by IKEDA Soji's avatar IKEDA Soji
Browse files

Refactoring.

parent 8c3fdd27
......@@ -57,6 +57,7 @@ my %config_user_map = (
'include_voot_group' => 'Sympa::DataSource::VOOT',
);
# Internal function.
sub _get_data_sources {
my $list = shift;
my $role = shift;
......@@ -134,6 +135,27 @@ sub _twist {
return 0;
}
# I. Start.
my (%start_times, $last_start_time, $start_time);
seek $lock_fh, 0, 0;
while (my $line = <$lock_fh>) {
next unless $line =~ /\A(\w+)\s+(\d+)/;
my $t = $2 + 0;
$start_times{$1} = $t;
$last_start_time = $t
if not defined $last_start_time or $t < $last_start_time;
}
$start_time = time;
if (defined $last_start_time and $start_time < $last_start_time) {
# Avoid retrace of clock e.g. by outage of NTP server.
$log->syslog('info', '%s: Clock got behind, skip inclusion', $list);
$self->add_stash($request, 'notice', 'include_skip',
{listname => $list->{'name'}});
return 0;
}
my $sdm = Sympa::DatabaseManager->instance;
return undef unless $sdm;
my $sth;
......@@ -142,15 +164,6 @@ sub _twist {
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
# I. Start.
my %sync_starts;
seek $lock_fh, 0, 0;
while (my $line = <$lock_fh>) {
$sync_starts{$1} = $2 + 0 if $line =~ /\A(\w+)\s+(\d+)/;
}
my $sync_start = time;
# II. Include new entries.
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
......@@ -158,11 +171,15 @@ sub _twist {
$lock_fh->extend;
next unless $ds->is_allowed_to_sync;
my %res = _sync_ds($ds, $sync_start);
my %res = _update_users($ds, $start_time);
next unless %res;
# Update time of allowed and succeeded data sources.
$sync_starts{$ds->get_short_id} = $sync_start;
$start_times{$ds->get_short_id} = $start_time;
# Special treatment for Sympa::DataSource::List.
_update_inclusion_table($ds, $start_time)
if ref $ds eq 'Sympa::DataSource::List';
$log->syslog(
'info', '%s: %d included, %d deleted, %d updated, %d kept',
......@@ -186,101 +203,42 @@ sub _twist {
# Choose most earlier time of succeeding inclusions (if any of
# data sources have not succeeded yet, Unix epoch will be chosen).
my $last_start = $sync_start;
$last_start_time = $start_time;
foreach my $id (map { $_->get_short_id } @$dss) {
unless (defined $sync_starts{$id}) {
undef $last_start;
unless (defined $start_times{$id}) {
undef $last_start_time;
last;
} elsif ($sync_starts{$id} < $last_start) {
$last_start = $sync_starts{$id};
} elsif ($start_times{$id} < $last_start_time) {
$last_start_time = $start_times{$id};
}
}
if (defined $last_start) {
# Remove list users not subscribing (only included) and
# not included anymore.
if (defined $last_start_time) {
$lock_fh->extend;
unless (
$sth = $sdm->do_prepared_query(
qq{SELECT user_$t AS email
FROM ${t}_table
WHERE (subscribed_$t IS NULL OR subscribed_$t <> 1) AND
inclusion_$t IS NOT NULL AND inclusion_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start,
$list->{'name'}, $list->{'domain'}
)
) {
#FIXME: report error
my %res = _expire_users($list, $role, $last_start_time);
unless (%res) {
$self->add_stash($request, 'intern');
#FIMXE: Report error.
return undef;
} else {
my @emails = map { $_->[0] } @{$sth->fetchall_arrayref || []};
$sth->finish;
foreach my $email (@emails) {
next unless defined $email and length $email;
if ($role eq 'member') {
$list->delete_list_member(users => [$email]);
# Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq
'on') {
unless (
Sympa::send_file($list, 'removed', $email, {})) {
$log->syslog('err',
'Unable to send template "removed" to %s',
$email);
}
}
} else {
$list->delete_list_admin($role, $email);
}
$result{deleted} += 1;
}
}
# Cancel inclusion of users subscribing (and also included) and
# not included anymore.
unless (
$sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = NULL, inclusion_ext_$t = NULL
WHERE subscribed_$t = 1 AND
inclusion_$t IS NOT NULL AND inclusion_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start,
$list->{'name'}, $list->{'domain'}
)
and $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_ext_$t = NULL
WHERE subscribed_$t = 1 AND
inclusion_ext_$t IS NOT NULL AND
inclusion_ext_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start,
$list->{'name'}, $list->{'domain'}
)
) {
#FIXME: report error
foreach my $key (keys %res) {
$result{$key} += $res{$key} if exists $result{$key};
}
# Special treatment for Sympa::DataSource::List.
_clean_inclusion_table($list, $role, $last_start);
_expire_inclusion_table($list, $role, $last_start_time);
}
# IV. Update custom attributes.
if ($role eq 'member') {
my $dss = _get_data_sources($list, 'custom_attribute');
if ($dss and @$dss) {
foreach my $ds (@{$dss || []}) {
$lock_fh->extend;
foreach
my $ds (@{_get_data_sources($list, 'custom_attribute') || []}) {
next unless $ds->is_allowed_to_sync;
_sync_ds($ds) if $ds->is_allowed_to_sync;
}
$lock_fh->extend;
_update_custom_attribute($ds);
}
}
......@@ -294,8 +252,8 @@ sub _twist {
return undef;
}
foreach my $id (map { $_->get_short_id } @$dss) {
printf $ofh "%s %d\n", $id, $sync_starts{$id}
if defined $sync_starts{$id};
printf $ofh "%s %d\n", $id, $start_times{$id}
if defined $start_times{$id};
}
close $ofh;
unlink $lock_file . '.old';
......@@ -316,34 +274,27 @@ sub _twist {
return 1;
}
sub _sync_ds {
# Internal function.
sub _update_users {
my $ds = shift;
my $sync_start = shift;
my $start_time = shift;
return unless $ds->open;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
while (my $entry = $ds->next) {
my ($email, $other_value) = @$entry;
my %res =
($ds->role eq 'custom_attribute')
? _sync_ds_ca($ds, $email, $other_value)
: _sync_ds_user($ds, $email, $other_value, $sync_start);
my %res = __update_user($ds, $email, $other_value, $start_time);
unless (%res) {
$ds->close;
$log->syslog('info', '%s: Aborted sync', $ds);
$log->syslog('info', '%s: Aborted inclusion', $ds);
return;
}
foreach my $res (keys %res) {
$result{$res} += $res{$res} if exists $result{$res};
}
}
unless ($ds->role eq 'custom_attribute') {
# Special treatment for Sympa::DataSource::List.
_update_inclusion_table($ds, $sync_start)
if ref $ds eq 'Sympa::DataSource::List';
}
$ds->close;
......@@ -351,40 +302,11 @@ sub _sync_ds {
}
# Internal function.
sub _sync_ds_ca {
my $ds = shift;
my $email = shift;
my $ca_update = shift;
my $list = $ds->{context};
my $member = $list->get_list_member($email);
return unless $member;
my $ca = $member->{custom_attribute} || {};
my $changed;
foreach my $key (sort keys %{$ca_update || {}}) {
my $cur = $ca->{$key};
$cur = '' unless defined $cur;
my $new = $ca_update->{$key};
$new = '' unless defined $new;
next if $cur eq $new;
$ca->{$key} = $new;
$changed = 1;
}
return (kept => 1) unless $changed;
$list->update_list_member($email, custom_attribute => $ca_update);
return (updated => 1);
}
# Internal function.
sub _sync_ds_user {
sub __update_user {
my $ds = shift;
my $email = shift;
my $gecos = shift;
my $sync_start = shift;
my $start_time = shift;
return (none => 0) unless Sympa::Tools::Text::valid_email($email);
$email = Sympa::Tools::Text::canonic_email($email);
......@@ -394,7 +316,7 @@ sub _sync_ds_user {
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $sync_start unless $sync_start <= time;
$time = $start_time unless $start_time <= time;
my $sdm = Sympa::DatabaseManager->instance;
return undef unless $sdm;
......@@ -420,8 +342,9 @@ sub _sync_ds_user {
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t AND
inclusion_ext_$t IS NOT NULL AND ? <= inclusion_ext_$t},
$email, $list->{'name'}, $list->{'domain'},
$sync_start, $sync_start
$email, $list->{'name'}, $list->{'domain'},
$start_time,
$start_time
);
} else {
return unless $sth = $sdm->do_prepared_query(
......@@ -430,7 +353,7 @@ sub _sync_ds_user {
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$email, $list->{'name'}, $list->{'domain'},
$sync_start
$start_time
);
}
my ($count) = $sth->fetchrow_array;
......@@ -491,12 +414,95 @@ sub _sync_ds_user {
return (added => 1);
}
sub _expire_users {
my $list = shift;
my $role = shift;
my $last_start_time = shift;
my $sdm = Sympa::DatabaseManager->instance;
return unless $sdm;
my $sth;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
my $deleted = 0;
# Remove list users not subscribing (only included) and
# not included anymore.
unless (
$sth = $sdm->do_prepared_query(
qq{SELECT user_$t AS email
FROM ${t}_table
WHERE (subscribed_$t IS NULL OR subscribed_$t <> 1) AND
inclusion_$t IS NOT NULL AND inclusion_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start_time,
$list->{'name'}, $list->{'domain'}
)
) {
return;
} else {
my @emails = map { $_->[0] } @{$sth->fetchall_arrayref || []};
$sth->finish;
foreach my $email (@emails) {
next unless defined $email and length $email;
if ($role eq 'member') {
$list->delete_list_member(users => [$email]);
# Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq
'on') {
unless (Sympa::send_file($list, 'removed', $email, {})) {
$log->syslog('err',
'Unable to send template "removed" to %s',
$email);
}
}
} else {
$list->delete_list_admin($role, $email);
}
$deleted += 1;
}
}
# Cancel inclusion of users subscribing (and also included) and
# not included anymore.
unless (
$sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = NULL, inclusion_ext_$t = NULL
WHERE subscribed_$t = 1 AND
inclusion_$t IS NOT NULL AND inclusion_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start_time,
$list->{'name'}, $list->{'domain'}
)
and $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_ext_$t = NULL
WHERE subscribed_$t = 1 AND
inclusion_ext_$t IS NOT NULL AND inclusion_ext_$t < ? AND
list_$t = ? AND robot_$t = ?$r},
$last_start_time,
$list->{'name'}, $list->{'domain'}
)
) {
#FIXME: report error
}
return (deleted => $deleted);
}
# Internal function.
# Update inclusion_table: This feature was added on 6.2.16.
# Related only to Sympa::DataSource::List class.
# Old name: (part of) Sympa::List::_update_inclusion_table().
sub _update_inclusion_table {
my $ds = shift;
my $sync_start = shift;
my $start_time = shift;
my $list = $ds->{context};
my $role = $ds->role;
......@@ -504,7 +510,7 @@ sub _update_inclusion_table {
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $sync_start unless $sync_start <= $time;
$time = $start_time unless $start_time <= $time;
my $sdm = Sympa::DatabaseManager->instance;
return undef unless $sdm;
......@@ -538,22 +544,64 @@ sub _update_inclusion_table {
return 1;
}
# Internal function.
# Old name: (part of) Sympa::List::_update_inclusion_table().
# Related only to Sympa::DataSource::List class.
sub _clean_inclusion_table {
my $list = shift;
my $role = shift;
my $sync_start = shift;
sub _expire_inclusion_table {
my $list = shift;
my $role = shift;
my $last_start_time = shift;
my $sdm = Sympa::DatabaseManager->instance;
$sdm and $sdm->do_prepared_query(
q{DELETE FROM inclusion_table
WHERE target_inclusion = ? AND role_inclusion = ? AND
update_epoch_inclusion < ?},
$list->get_id, $role, $sync_start
$list->get_id, $role,
$last_start_time
);
}
# Internal function.
sub _update_custom_attribute {
my $ds = shift;
die 'bug in logic. Ask developer' unless $ds->role eq 'custom_attribute';
return unless $ds->open;
my $list = $ds->{context};
my $updated = 0;
while (my $entry = $ds->next) {
my ($email, $ca_update) = @$entry;
my $member = $list->get_list_member($email);
next unless $member;
my $ca = $member->{custom_attribute} || {};
my $changed;
foreach my $key (sort keys %{$ca_update || {}}) {
my $cur = $ca->{$key};
$cur = '' unless defined $cur;
my $new = $ca_update->{$key};
$new = '' unless defined $new;
next if $cur eq $new;
$ca->{$key} = $new;
$changed = 1;
}
next unless $changed;
$list->update_list_member($email, custom_attribute => $ca_update);
$updated++;
}
$ds->close;
return (updated => $updated);
}
# Enforce uniqueness in a comma separated list of user source ID's.
# Old name: (part of) Sympa::List::add_source_id().
# No longer used.
......@@ -577,7 +625,7 @@ Sympa::Request::Hander::include - include request handler
Includes users from data sources to a list.
Opens data sources, synchronizes list users with each of them and closes.
Opens data sources, include or update list users with each of them and closes.
TBD.
=head1 SEE ALSO
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment