Commit 88a70924 authored by Markus Jylhänkangas's avatar Markus Jylhänkangas
Browse files

Use transactions to improve member synchronization performance

See discussion: #1186
parent 3264df0c
...@@ -1547,6 +1547,7 @@ sub delete_list_member { ...@@ -1547,6 +1547,7 @@ sub delete_list_member {
my $total = 0; my $total = 0;
my $sdm = Sympa::DatabaseManager->instance; my $sdm = Sympa::DatabaseManager->instance;
$sdm->__dbh->begin_work;
foreach my $who (@u) { foreach my $who (@u) {
$who = Sympa::Tools::Text::canonic_email($who); $who = Sympa::Tools::Text::canonic_email($who);
...@@ -1600,6 +1601,14 @@ sub delete_list_member { ...@@ -1600,6 +1601,14 @@ sub delete_list_member {
$self->_cache_publish_expiry('member'); $self->_cache_publish_expiry('member');
delete_list_member_picture($self, shift(@u)); delete_list_member_picture($self, shift(@u));
unless ($sdm->__dbh->{AutoCommit}) {
my $rc = $sdm->__dbh->commit;
unless ($rc) {
$log->syslog('err', 'Error at delete member commit: %s', $sdm->errstr);
$sdm->__dbh->rollback;
}
}
return (-1 * $total); return (-1 * $total);
} }
...@@ -3217,6 +3226,8 @@ sub add_list_member { ...@@ -3217,6 +3226,8 @@ sub add_list_member {
my $sdm = Sympa::DatabaseManager->instance; my $sdm = Sympa::DatabaseManager->instance;
$sdm->__dbh->begin_work;
foreach my $new_user (@new_users) { foreach my $new_user (@new_users) {
my $who = Sympa::Tools::Text::canonic_email($new_user->{'email'}); my $who = Sympa::Tools::Text::canonic_email($new_user->{'email'});
unless (defined $who) { unless (defined $who) {
...@@ -3255,7 +3266,6 @@ sub add_list_member { ...@@ -3255,7 +3266,6 @@ sub add_list_member {
$new_user->{'date'} ||= time; $new_user->{'date'} ||= time;
$new_user->{'update_date'} ||= $new_user->{'date'}; $new_user->{'update_date'} ||= $new_user->{'date'};
my $custom_attribute;
if (ref $new_user->{'custom_attribute'} eq 'HASH') { if (ref $new_user->{'custom_attribute'} eq 'HASH') {
$new_user->{'custom_attribute'} = $new_user->{'custom_attribute'} =
Sympa::Tools::Data::encode_custom_attribute( Sympa::Tools::Data::encode_custom_attribute(
...@@ -3363,6 +3373,13 @@ sub add_list_member { ...@@ -3363,6 +3373,13 @@ sub add_list_member {
$self->{'add_outcome'}{'remaining_member_to_add'}--; $self->{'add_outcome'}{'remaining_member_to_add'}--;
$current_list_members_count++; $current_list_members_count++;
} }
unless ($sdm->__dbh->{AutoCommit}) {
my $rc = $sdm->__dbh->commit;
unless ($rc) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->errstr);
$sdm->__dbh->rollback;
}
}
$self->_cache_publish_expiry('member'); $self->_cache_publish_expiry('member');
$self->_create_add_error_string() if ($self->{'add_outcome'}{'errors'}); $self->_create_add_error_string() if ($self->{'add_outcome'}{'errors'});
......
...@@ -367,22 +367,154 @@ sub _update_users { ...@@ -367,22 +367,154 @@ sub _update_users {
my $start_time = shift; my $start_time = shift;
return unless $ds->open; return unless $ds->open;
my $list = $ds->{context};
my $role = $ds->role;
my $sdm = Sympa::DatabaseManager->instance;
return undef unless $sdm;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
my $is_external_ds = not (ref $ds eq 'Sympa::DataSource::List'
and [ split /\@/, $ds->{listname}, 2 ]->[1] eq $list->{'domain'});
my %result = (added => 0, deleted => 0, updated => 0, kept => 0); my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my $user_query;
$user_query = $sdm->do_prepared_query(qq{ SELECT user_$t, inclusion_$t, inclusion_ext_$t
FROM ${t}_table
WHERE list_$t = ? AND robot_$t = ?$r
}, $list->{'name'}, $list->{'domain'});
my %users = ();
while (my @row = $user_query->fetchrow_array) {
my $user_email = $row[0];
my $user_inclusion = $row[1];
my $user_inclusion_ext = $row[2];
$users{$user_email} = [ $user_inclusion, $user_inclusion_ext ];
}
my %exclusion_list;
my $exclusion_list_query = $sdm->do_prepared_query(q{SELECT user_exclusion
FROM exclusion_table
WHERE list_exclusion = ? AND
robot_exclusion = ?},
$list->{'name'},
$list->{'domain'});
while (my @row = $exclusion_list_query->fetchrow_array) {
$exclusion_list{$row[0]} = 1;
}
my %to_be_inserted;
$sdm->__dbh->begin_work;
while (my $entry = $ds->next) { while (my $entry = $ds->next) {
my ($email, $other_value) = @$entry; my ($email, $gecos) = @$entry;
my %res = __update_user($ds, $email, $other_value, $start_time);
next unless Sympa::Tools::Text::valid_email($email);
$email = Sympa::Tools::Text::canonic_email($email);
# 1. If role of the data source is 'member' and the user is excluded:
# Do nothing.
next (none => 0) if $role eq 'member' and exists $exclusion_list{$email};
if (exists $users{$email}) {
# 2. Keep
if ($is_external_ds) {
if ($users{$email}[0] ne "" && $start_time <= int($users{$email}[0])
&& $users{$email}[1] ne "" && $start_time <= int($users{$email}[1])) {
$result{'kept'}++;
next;
}
}
else {
if ($users{$email}[0] ne "" && $start_time <= int($users{$email}[0])) {
$result{'kept'}++;
next;
}
}
# 3. Update
my %res = __update_user($ds, $email, $start_time, $list, $t, $r, $is_external_ds, $time);
unless (%res) { unless (%res) {
$ds->close; $ds->close;
$log->syslog('info', '%s: Aborted inclusion', $ds); $log->syslog('info', '%s: Aborted update', $ds);
$sdm->__dbh->rollback;
return; return;
} }
foreach my $res (keys %res) { foreach my $res (keys %res) {
$result{$res} += $res{$res} if exists $result{$res}; $result{$res} += $res{$res} if exists $result{$res};
} }
} }
else {
$to_be_inserted{$email} = $gecos;
}
}
unless ($sdm->__dbh->{AutoCommit}) {
my $rc = $sdm->__dbh->commit;
unless ($rc) {
$log->syslog('err', 'Error at update user commit: %s', $sdm->errstr);
$sdm->__dbh->rollback;
}
}
my @list_of_new_users;
for (keys %to_be_inserted) {
my $user = {
email => $_,
gecos => $to_be_inserted{$_},
subscribed => 0,
date => $time,
update_date => $time,
inclusion => $time,
($is_external_ds ? (inclusion_ext => $time) : ()),
inclusion_label => $ds->name,
};
my @defkeys = @{$ds->{_defkeys} || []};
my @defvals = @{$ds->{_defvals} || []};
@{$user}{@defkeys} = @defvals if @defkeys;
push(@list_of_new_users, $user);
$result{'added'}++;
}
# 4. Otherwise, i.e. a new user:
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
if ($role eq 'member') {
$list->add_list_member(@list_of_new_users);
foreach my $new_user (@list_of_new_users) {
if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
unless ($list->send_probe_to_user('welcome', $new_user->{'email'})) {
$log->syslog('err',
'Unable to send "welcome" probe to %s', $new_user->{'email'});
}
}
}
}
else {
$list->add_list_admin($role, @list_of_new_users);
}
$ds->close; $ds->close;
return %result; return %result;
...@@ -392,60 +524,17 @@ sub _update_users { ...@@ -392,60 +524,17 @@ sub _update_users {
sub __update_user { sub __update_user {
my $ds = shift; my $ds = shift;
my $email = shift; my $email = shift;
my $gecos = shift;
my $start_time = shift; my $start_time = shift;
my $list = shift;
return (none => 0) unless Sympa::Tools::Text::valid_email($email); my $t = shift;
$email = Sympa::Tools::Text::canonic_email($email); my $r = shift;
my $is_external_ds = shift;
my $list = $ds->{context}; my $time = shift;
my $role = $ds->role;
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my $sdm = Sympa::DatabaseManager->instance; my $sdm = Sympa::DatabaseManager->instance;
return undef unless $sdm; return undef unless $sdm;
my $sth; my $sth;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
my $is_external_ds = not(ref $ds eq 'Sympa::DataSource::List'
and [split /\@/, $ds->{listname}, 2]->[1] eq $list->{'domain'});
# 1. If role of the data source is 'member' and the user is excluded:
# Do nothing.
return (none => 0)
if $role eq 'member' and $list->is_member_excluded($email);
# 2. If user has already been updated by the other data sources:
# Keep user.
if ($is_external_ds) {
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t AND
inclusion_ext_$t IS NOT NULL AND ? <= inclusion_ext_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time,
$start_time
);
} else {
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
}
my ($count) = $sth->fetchrow_array;
$sth->finish;
return (kept => 1) if $count;
# 3. If user (has not been updated by the other data sources and) exists: # 3. If user (has not been updated by the other data sources and) exists:
# UPDATE inclusion. # UPDATE inclusion.
...@@ -491,39 +580,6 @@ sub __update_user { ...@@ -491,39 +580,6 @@ sub __update_user {
return (updated => 1) if $sth->rows; return (updated => 1) if $sth->rows;
} }
# 4. Otherwise, i.e. a new user:
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my $user = {
email => $email,
gecos => $gecos,
subscribed => 0,
date => $time,
update_date => $time,
inclusion => $time,
($is_external_ds ? (inclusion_ext => $time) : ()),
inclusion_label => $ds->name,
};
my @defkeys = @{$ds->{_defkeys} || []};
my @defvals = @{$ds->{_defvals} || []};
@{$user}{@defkeys} = @defvals if @defkeys;
if ($role eq 'member') {
$list->add_list_member($user);
# Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
unless ($list->send_probe_to_user('welcome', $email)) {
$log->syslog('err',
'Unable to send "welcome" probe to %s', $email);
}
}
} else {
$list->add_list_admin($role, $user);
}
return (added => 1);
} }
sub _expire_users { sub _expire_users {
...@@ -579,12 +635,14 @@ sub _expire_users { ...@@ -579,12 +635,14 @@ sub _expire_users {
my @emails = map { $_->[0] } @{$sth->fetchall_arrayref || []}; my @emails = map { $_->[0] } @{$sth->fetchall_arrayref || []};
$sth->finish; $sth->finish;
foreach my $email (@emails) {
next unless defined $email and length $email;
if ($role eq 'member') { if ($role eq 'member') {
$list->delete_list_member(users => [$email]); $list->delete_list_member(users => \@emails);
}
else {
$list->delete_list_admin($role, \@emails);
}
foreach my $email (@emails) {
# Send notification if the list config authorizes it only. # Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq if ($list->{'admin'}{'inclusion_notification_feature'} eq
'on') { 'on') {
...@@ -594,9 +652,6 @@ sub _expire_users { ...@@ -594,9 +652,6 @@ sub _expire_users {
$email); $email);
} }
} }
} else {
$list->delete_list_admin($role, $email);
}
$deleted += 1; $deleted += 1;
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment