Unverified Commit 17e1a6ed authored by IKEDA Soji's avatar IKEDA Soji Committed by GitHub
Browse files

Merge pull request #1398 from ikedas/issue-1186_rev by ikedas

Revert changes for #1186 "Improving data source synchronization performance"
parents 9394a80b 55f99acb
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2017, 2018, 2019, 2021 The Sympa Community. See the
# Copyright 2017, 2018, 2019, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -108,13 +108,6 @@ sub connect {
$self);
return 1;
}
# Disconnected: Transaction (if any) was aborted.
if (delete $self->{_sdbTransactionLevel}) {
$log->syslog('err', 'Transaction on database %s was aborted: %s',
$self, $DBI::errstr);
$self->set_persistent($self->{_sdbPrevPersistency});
return undef;
}
# Do we have required parameters?
foreach my $param (@{$self->required_parameters}) {
......@@ -413,59 +406,6 @@ sub prepare_query_log_values {
# DEPRECATED: Use tools::eval_in_time() and fetchall_arrayref().
#sub fetch();
# As most of DBMS do not support nested transactions, these are not
# effective during when {_sdbTransactionLevel} attribute is
# positive, i.e. only the outermost transaction will be available.
sub begin {
my $self = shift;
$self->{_sdbTransactionLevel} //= 0;
if ($self->{_sdbTransactionLevel}++) {
return 1;
}
my $dbh = $self->__dbh;
return undef unless $dbh;
$dbh->begin_work or die $DBI::errstr;
$self->{_sdbPrevPersistency} = $self->set_persistent(0);
return 1;
}
sub commit {
my $self = shift;
unless ($self->{_sdbTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbTransactionLevel}) {
return 1;
}
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->set_persistent($self->{_sdbPrevPersistency});
return $dbh->commit;
}
sub rollback {
my $self = shift;
unless ($self->{_sdbTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbTransactionLevel}) {
return 1;
}
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->set_persistent($self->{_sdbPrevPersistency});
return $dbh->rollback;
}
sub disconnect {
my $self = shift;
......@@ -621,16 +561,6 @@ TBD.
I<Constructor>.
Creates new database instance.
=item begin ( )
I<Instance method>, I<only for SQL>.
Begin transaction.
=item commit ( )
I<Instance method>, I<only for SQL>.
Commit transaction.
=item do_operation ( $operation, options... )
I<Instance method>, I<only for LDAP>.
......@@ -661,11 +591,6 @@ Returns:
Statement handle (L<DBI::st> object or such), or C<undef>.
=item rollback ( )
I<Instance method>, I<only for SQL>.
Rollback transaction.
=back
=head1 SEE ALSO
......
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2018, 2021 The Sympa Community. See the
# Copyright 2018, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -565,14 +565,6 @@ provided by L<Sympa::Database> class:
=over
=item begin ( )
I<Overridable>, I<only for SQL driver>.
=item commit ( )
I<Overridable>, I<only for SQL driver>.
=item do_operation ( $operation, $parameters, ...)
I<Overridable>, I<only for LDAP driver>.
......@@ -585,10 +577,6 @@ I<Overridable>, I<only for SQL driver>.
I<Overridable>, I<only for SQL driver>.
=item rollback ( )
I<Overridable>, I<only for SQL driver>.
=item AS_DOUBLE ( $value )
I<Overridable>.
......
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2018, 2021 The Sympa Community. See the
# Copyright 2018, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -521,11 +521,9 @@ sub translate_type {
}
# Note:
# - To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
# - If a transaction has been begun, lock is not needed, because SQLite
# does not support nested transactions.
# To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
sub do_query {
my $self = shift;
my $sth;
......@@ -533,8 +531,8 @@ sub do_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbTransactionLevel};
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......@@ -573,8 +571,8 @@ sub do_prepared_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbTransactionLevel};
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2017, 2018, 2019, 2020, 2021 The Sympa Community. See the
# Copyright 2017, 2018, 2019, 2020, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -1551,8 +1551,6 @@ sub delete_list_member {
my $sdm = Sympa::DatabaseManager->instance;
my $sth;
$sdm->begin;
foreach my $who (@$users) {
next unless defined $who and length $who;
$who = Sympa::Tools::Text::canonic_email($who);
......@@ -1618,12 +1616,6 @@ sub delete_list_member {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at delete member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('member');
return (-1 * $total);
......@@ -1644,8 +1636,6 @@ sub delete_list_admin {
my $sdm = Sympa::DatabaseManager->instance;
my $sth;
$sdm->begin;
$users = [$users] unless ref $users; # compat.
foreach my $who (@$users) {
next unless defined $who and length $who;
......@@ -1683,12 +1673,6 @@ sub delete_list_admin {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user');
return (-1 * $total);
......@@ -3111,7 +3095,6 @@ sub add_list_member {
my %map_field = _map_list_member_cols();
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $u (@users) {
unless (Sympa::Tools::Text::valid_email($u->{email})) {
......@@ -3301,11 +3284,6 @@ sub add_list_member {
$current_list_members_count++;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
}
$self->_cache_publish_expiry('member');
push @$stash_ref, ['notice', 'add_performed', {total => $added_members}]
......@@ -3331,20 +3309,10 @@ sub add_list_admin {
my $stash_ref = $options{stash} || [];
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $user (@users) {
$total++ if $self->_add_list_admin($role, $user, %options);
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add admin commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user') if $total;
push @$stash_ref, ['notice', 'add_performed', {total => $total}]
......
......@@ -4,7 +4,7 @@
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 2019, 2020, 2021 The Sympa Community. See the
# Copyright 2019, 2020, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -382,107 +382,120 @@ sub _update_users {
return unless $sdm;
my $sth;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
my %users;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
return unless $sth = $sdm->do_prepared_query(
qq{
SELECT user_$t, inclusion_$t, inclusion_ext_$t
FROM ${t}_table
WHERE list_$t = ? AND robot_$t = ?$r},
$list->{'name'}, $list->{'domain'}
);
while (my @row = $sth->fetchrow_array) {
my $user_email = Sympa::Tools::Text::canonic_email($row[0]);
my $user_inclusion = $row[1];
my $user_inclusion_ext = $row[2];
$users{$user_email} = {
inclusion => $user_inclusion,
inclusion_ext => $user_inclusion_ext
};
}
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my %exclusion_list =
map { (Sympa::Tools::Text::canonic_email($_) => 1) }
@{$list->get_exclusion->{emails}}
if $role eq 'member';
my @to_be_inserted;
$sdm->begin;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
while (my $entry = $ds->next) {
my $email = $entry->[0];
$email = Sympa::Tools::Text::canonic_email($email)
if Sympa::Tools::Text::valid_email($email);
my ($email, $gecos) = @$entry;
# 1. If role of the data source is 'member' and the user is excluded:
# Do nothing.
next if $role eq 'member' and exists $exclusion_list{$email};
# 4. If the user is not a member, i.e. a new user:
# INSERT new user (see below).
unless (exists $users{$email}) {
push @to_be_inserted, $entry;
next;
}
# 2. If user has already been updated by the other data sources:
# Keep user.
my ($inclusion, $inclusion_ext) =
($users{$email}->{inclusion}, $users{$email}->{inclusion_ext});
if ($ds->is_external) {
if ( defined $inclusion
and $start_time <= int($inclusion)
and defined $inclusion_ext
and $start_time <= int($inclusion_ext)) {
$result{kept}++;
next;
}
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND
? <= inclusion_$t AND
inclusion_ext_$t IS NOT NULL AND
? <= inclusion_ext_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time,
$start_time
);
} else {
if (defined $inclusion and $start_time <= int($inclusion)) {
$result{kept}++;
next;
}
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND
? <= inclusion_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
}
my ($count) = $sth->fetchrow_array;
$sth->finish;
if ($count) {
$result{kept}++;
next;
}
# 3. If user (has not been updated by the other data sources and)
# exists:
# UPDATE inclusion.
my %res = __update_user($ds, $email, $start_time);
if ($ds->is_external) {
# Already updated by the other non-external data source but not
# yet by any other external ones:
# Update inclusion_ext (and inclusion) field, but not
# inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$time, $time,
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
if ($sth->rows) {
next;
}
unless (%res) {
$log->syslog('info', '%s: Aborted update', $ds);
$sdm->rollback;
$ds->close;
return;
# Not yet updated by any other data sources:
# Update inclusion_ext (and inclusion), and assign
# inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time, $time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
if ($sth->rows) {
$result{updated}++;
next;
}
} else {
# Not yet updated by any other data sources:
# Update inclusion, and assign inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
if ($sth->rows) {
$result{updated}++;
next;
}
}
$result{updated} += $res{updated} if $res{updated};
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at update user commit: %s', $sdm->error);
$sdm->rollback;
$ds->close;
return;
}
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my @list_of_new_users = map {
my ($email, $gecos) = @$_;
# 4. Otherwise, i.e. a new user:
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my $user = {
%{$ds->{default_user_options} // {}},
email => $email,
......@@ -495,99 +508,27 @@ sub _update_users {
inclusion_label => $ds->name,
};
$result{added}++;
$user;
} @to_be_inserted;
if ($role eq 'member') {
$list->add_list_member(@list_of_new_users);
if ($role eq 'member') {
$list->add_list_member($user);
foreach my $new_user (@list_of_new_users) {
# Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
unless (
$list->send_probe_to_user(
'welcome', $new_user->{'email'}
)
) {
unless ($list->send_probe_to_user('welcome', $email)) {
$log->syslog('err',
'Unable to send "welcome" probe to %s',
$new_user->{'email'});
'Unable to send "welcome" probe to %s', $email);
}
}
} else {
$list->add_list_admin($role, $user);
}
} else {
$list->add_list_admin($role, @list_of_new_users);
$result{added}++;
}
$ds->close;
return %result;
}
# Internal function.
sub __update_user {
my $ds = shift;
my $email = shift;
my $start_time = shift;
my $list = $ds->{context};
my $role = $ds->role;
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my $sdm = Sympa::DatabaseManager->instance;
return unless $sdm;
my $sth;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
if ($ds->is_external) {
# Already updated by the other non-external data source but not yet
# by any other external ones:
# Update inclusion_ext (and inclusion) field, but not inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$time, $time,
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
return (updated => 0) if $sth->rows;
# Not yet updated by any other data sources:
# Update inclusion_ext (and inclusion), and assign inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time, $time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
return (updated => 1) if $sth->rows;
} else {
# Not yet updated by any other data sources:
# Update inclusion, and assign inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
return (updated => 1) if $sth->rows;
}
}
sub _expire_users {
my $list = shift;
my $role = shift;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment