Commit 74b429b0 authored by IKEDA Soji's avatar IKEDA Soji
Browse files

Revert changes for #1186 "Improving data source synchronization performance"

And small refactoring.
parent 9585c001
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2017, 2018, 2019, 2021 The Sympa Community. See the
# Copyright 2017, 2018, 2019, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -108,13 +108,6 @@ sub connect {
$self);
return 1;
}
# Disconnected: Transaction (if any) was aborted.
if (delete $self->{_sdbTransactionLevel}) {
$log->syslog('err', 'Transaction on database %s was aborted: %s',
$self, $DBI::errstr);
$self->set_persistent($self->{_sdbPrevPersistency});
return undef;
}
# Do we have required parameters?
foreach my $param (@{$self->required_parameters}) {
......@@ -413,56 +406,6 @@ sub prepare_query_log_values {
# DEPRECATED: Use tools::eval_in_time() and fetchall_arrayref().
#sub fetch();
sub begin {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
return undef unless $dbh->begin_work;
$self->{_sdbTransactionLevel} //= 0;
unless ($self->{_sdbTransactionLevel}++) {
$self->{_sdbPrevPersistency} = $self->set_persistent(0);
}
return 1;
}
sub _finalize_transaction {
my $self = shift;
unless (defined $self->{_sdbTransactionLevel}) {
return;
}
unless ($self->{_sdbTransactionLevel}) {
die 'bug in logic. Ask developer';
}
unless (--$self->{_sdbTransactionLevel}) {
$self->set_persistent($self->{_sdbPrevPersistency});
}
}
sub commit {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->_finalize_transaction;
return $dbh->commit;
}
sub rollback {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->_finalize_transaction;
return $dbh->rollback;
}
sub disconnect {
my $self = shift;
......@@ -593,16 +536,6 @@ TBD.
I<Constructor>.
Creates new database instance.
=item begin ( )
I<Instance method>, I<only for SQL>.
Begin transaction.
=item commit ( )
I<Instance method>, I<only for SQL>.
Commit transaction.
=item do_operation ( $operation, options... )
I<Instance method>, I<only for LDAP>.
......@@ -633,11 +566,6 @@ Returns:
Statement handle (L<DBI::st> object or such), or C<undef>.
=item rollback ( )
I<Instance method>, I<only for SQL>.
Rollback transaction.
=back
=head1 SEE ALSO
......
......@@ -8,8 +8,8 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2018 The Sympa Community. See the AUTHORS.md file at the
# top-level directory of this distribution and at
# Copyright 2018, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
......@@ -539,14 +539,6 @@ provided by L<Sympa::Database> class:
=over
=item begin ( )
I<Overridable>, I<only for SQL driver>.
=item commit ( )
I<Overridable>, I<only for SQL driver>.
=item do_operation ( $operation, $parameters, ...)
I<Overridable>, I<only for LDAP driver>.
......@@ -559,10 +551,6 @@ I<Overridable>, I<only for SQL driver>.
I<Overridable>, I<only for SQL driver>.
=item rollback ( )
I<Overridable>, I<only for SQL driver>.
=item AS_DOUBLE ( $value )
I<Overridable>.
......
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2018, 2021 The Sympa Community. See the
# Copyright 2018, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -520,50 +520,10 @@ sub translate_type {
return $type;
}
# As SQLite does not support nested transactions, these are not effective
# during when {_sdbSQLiteTransactionLevel} attribute is positive, i.e. only
# the outermost transaction will be available.
sub begin {
my $self = shift;
$self->{_sdbSQLiteTransactionLevel} //= 0;
if ($self->{_sdbSQLiteTransactionLevel}++) {
return 1;
}
return $self->SUPER::begin;
}
sub commit {
my $self = shift;
unless ($self->{_sdbSQLiteTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbSQLiteTransactionLevel}) {
return 1;
}
return $self->SUPER::commit;
}
sub rollback {
my $self = shift;
unless ($self->{_sdbSQLiteTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbSQLiteTransactionLevel}) {
return 1;
}
return $self->SUPER::rollback;
}
# Note:
# - To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
# - If a transaction has been begun, lock is not needed, because SQLite
# does not support nested transactions.
# To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
sub do_query {
my $self = shift;
my $sth;
......@@ -571,8 +531,8 @@ sub do_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbSQLiteTransactionLevel};
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......@@ -611,8 +571,8 @@ sub do_prepared_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbSQLiteTransactionLevel};
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......
......@@ -8,7 +8,7 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2017, 2018, 2019, 2020, 2021 The Sympa Community. See the
# Copyright 2017, 2018, 2019, 2020, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -1547,7 +1547,6 @@ sub delete_list_member {
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $who (@u) {
next unless defined $who and length $who;
......@@ -1603,12 +1602,6 @@ sub delete_list_member {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at delete member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('member');
return (-1 * $total);
......@@ -1617,16 +1610,16 @@ sub delete_list_member {
## Delete the indicated admin users from the list.
sub delete_list_admin {
$log->syslog('debug2', '(%s, %s, ...)', @_);
my $self = shift;
my $role = shift;
my @u = @_;
my $self = shift;
my $role = shift;
my $users = shift;
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $who (@u) {
$users = [$users] unless ref $users; # compat.
foreach my $who (@$users) {
next unless defined $who and length $who;
$who = Sympa::Tools::Text::canonic_email($who);
......@@ -1649,12 +1642,6 @@ sub delete_list_admin {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user');
return (-1 * $total);
......@@ -3187,7 +3174,6 @@ sub add_list_member {
}
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $new_user (@new_users) {
my $who = Sympa::Tools::Text::canonic_email($new_user->{'email'});
......@@ -3335,11 +3321,6 @@ sub add_list_member {
$current_list_members_count++;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
}
$self->_cache_publish_expiry('member');
$self->_create_add_error_string() if ($self->{'add_outcome'}{'errors'});
return 1;
......@@ -3376,20 +3357,10 @@ sub add_list_admin {
my @users = @_;
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $user (@users) {
$total++ if $self->_add_list_admin($role, $user);
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add admin commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user') if $total;
return $total;
......
......@@ -172,7 +172,7 @@ sub _close {
# Remove entries from admin_table.
foreach my $role (qw(editor owner)) {
$list->delete_list_admin($role, $list->get_admins_email($role));
$list->delete_list_admin($role, [$list->get_admins_email($role)]);
}
# Change status & save config.
......
......@@ -4,7 +4,7 @@
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 2019, 2020, 2021 The Sympa Community. See the
# Copyright 2019, 2020, 2021, 2022 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
......@@ -374,107 +374,120 @@ sub _update_users {
return unless $sdm;
my $sth;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
my %users;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
return unless $sth = $sdm->do_prepared_query(
qq{
SELECT user_$t, inclusion_$t, inclusion_ext_$t
FROM ${t}_table
WHERE list_$t = ? AND robot_$t = ?$r},
$list->{'name'}, $list->{'domain'}
);
while (my @row = $sth->fetchrow_array) {
my $user_email = Sympa::Tools::Text::canonic_email($row[0]);
my $user_inclusion = $row[1];
my $user_inclusion_ext = $row[2];
$users{$user_email} = {
inclusion => $user_inclusion,
inclusion_ext => $user_inclusion_ext
};
}
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my %exclusion_list =
map { (Sympa::Tools::Text::canonic_email($_) => 1) }
@{$list->get_exclusion->{emails}}
if $role eq 'member';
my @to_be_inserted;
$sdm->begin;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
while (my $entry = $ds->next) {
my $email = $entry->[0];
next unless Sympa::Tools::Text::valid_email($email);
$email = Sympa::Tools::Text::canonic_email($email);
my ($email, $gecos) = @$entry;
# 1. If role of the data source is 'member' and the user is excluded:
# Do nothing.
next if $role eq 'member' and exists $exclusion_list{$email};
# 4. If the user is not a member, i.e. a new user:
# INSERT new user (see below).
unless (exists $users{$email}) {
push @to_be_inserted, $entry;
next;
}
# 2. If user has already been updated by the other data sources:
# Keep user.
my ($inclusion, $inclusion_ext) =
($users{$email}->{inclusion}, $users{$email}->{inclusion_ext});
if ($ds->is_external) {
if ( defined $inclusion
and $start_time <= int($inclusion)
and defined $inclusion_ext
and $start_time <= int($inclusion_ext)) {
$result{kept}++;
next;
}
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND
? <= inclusion_$t AND
inclusion_ext_$t IS NOT NULL AND
? <= inclusion_ext_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time,
$start_time
);
} else {
if (defined $inclusion and $start_time <= int($inclusion)) {
$result{kept}++;
next;
}
return unless $sth = $sdm->do_prepared_query(
qq{SELECT COUNT(*)
FROM ${t}_table
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND
? <= inclusion_$t},
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
}
my ($count) = $sth->fetchrow_array;
$sth->finish;
if ($count) {
$result{kept}++;
next;
}
# 3. If user (has not been updated by the other data sources and)
# exists:
# UPDATE inclusion.
my %res = __update_user($ds, $email, $start_time);
if ($ds->is_external) {
# Already updated by the other non-external data source but not
# yet by any other external ones:
# Update inclusion_ext (and inclusion) field, but not
# inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$time, $time,
$email, $list->{'name'}, $list->{'domain'},
$start_time
);
if ($sth->rows) {
next;
}
unless (%res) {
$log->syslog('info', '%s: Aborted update', $ds);
$sdm->rollback;
$ds->close;
return;
# Not yet updated by any other data sources:
# Update inclusion_ext (and inclusion), and assign
# inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time, $time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
if ($sth->rows) {
$result{updated}++;
next;
}
} else {
# Not yet updated by any other data sources:
# Update inclusion, and assign inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?,
inclusion_label_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r},
$time,
$ds->name,
$email, $list->{'name'}, $list->{'domain'}
);
if ($sth->rows) {
$result{updated}++;
next;
}
}
$result{updated} += $res{updated} if $res{updated};
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at update user commit: %s', $sdm->error);
$sdm->rollback;
$ds->close;
return;
}
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my @list_of_new_users = map {
my ($email, $gecos) = @$_;
# 4. Otherwise, i.e. a new user:
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my $user = {
email => $email,
gecos => $gecos,
......@@ -489,99 +502,27 @@ sub _update_users {
my @defvals = @{$ds->{_defvals} || []};
@{$user}{@defkeys} = @defvals if @defkeys;
$result{added}++;
$user;
} @to_be_inserted;
if ($role eq 'member') {
$list->add_list_member(@list_of_new_users);
if ($role eq 'member') {
$list->add_list_member($user);
foreach my $new_user (@list_of_new_users) {
# Send notification if the list config authorizes it only.
if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
unless (
$list->send_probe_to_user(
'welcome', $new_user->{'email'}
)
) {
unless ($list->send_probe_to_user('welcome', $email)) {
$log->syslog('err',
'Unable to send "welcome" probe to %s',
$new_user->{'email'});
'Unable to send "welcome" probe to %s', $email);
}
}
} else {
$list->add_list_admin($role, $user);
}
} else {
$list->add_list_admin($role, @list_of_new_users);
$result{added}++;
}
$ds->close;
return %result;
}
# Internal function.
sub __update_user {
my $ds = shift;
my $email = shift;
my $start_time = shift;
my $list = $ds->{context};
my $role = $ds->role;
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
my $sdm = Sympa::DatabaseManager->instance;
return unless $sdm;
my $sth;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
if ($ds->is_external) {
# Already updated by the other non-external data source but not yet
# by any other external ones:
# Update inclusion_ext (and inclusion) field, but not inclusion_label.
return unless $sth = $sdm->do_prepared_query(
qq{UPDATE ${t}_table
SET inclusion_$t = ?, inclusion_ext_$t = ?
WHERE user_$t = ? AND list_$t = ? AND robot_$t = ?$r AND
inclusion_$t IS NOT NULL AND ? <= inclusion_$t},
$time, $time,
$email, $list->{'name'}, $list->{'domain'},
$start_time