Unverified Commit ea2ebd3d authored by IKEDA Soji's avatar IKEDA Soji Committed by GitHub
Browse files

Merge pull request #1215 from ikedas/Jylhis/sync-performance by Jylhis & ikedas

Data source synchronization performance improvement (#1186)
parents 311b97e9 cca74db3
......@@ -4,8 +4,8 @@
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 2019 The Sympa Community. See the AUTHORS.md file at
# the top-level directory of this distribution and at
# Copyright 2019, 2021 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
......@@ -98,12 +98,17 @@ sub new {
$options{name} = substr $options{name}, 0, 50
if $options{name} and 50 < length $options{name};
return $type->_new(
my $self = $type->_new(
%options,
_role => $role,
_defkeys => [@defkeys],
_defvals => [@defvals],
);
$self->{_external} = not($self->isa('Sympa::DataSource::List')
and [split /\@/, $self->{listname}, 2]->[1] eq $list->{'domain'})
if ref $list eq 'Sympa::List';
$self;
}
sub _new {
......@@ -270,6 +275,10 @@ sub is_allowed_to_sync {
return 1;
}
sub is_external {
shift->{_external};
}
1;
__END__
......@@ -360,6 +369,26 @@ A new instance, or C<undef> on failure.
I<Instance method>.
Closes backend and does cleanup.
=item is_external ( )
I<Instance method>.
Returns true value if the data source is external data source.
"External" means that it is not C<include_sympa_list> (the instance of
L<Sympa::DataSource::List>) or not including any lists on local domain.
Known bug:
=over
=item *
If a data source is a list included from the other external data source(s),
this method will treat it as non-external so that some requests not allowed
for external data sources, such as C<move_user> request, on corresponding
users may be allowed.
=back
=item next ( )
I<Instance method>.
......
......@@ -8,8 +8,8 @@
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
# Copyright 2017, 2018, 2019 The Sympa Community. See the AUTHORS.md file at
# the top-level directory of this distribution and at
# Copyright 2017, 2018, 2019, 2021 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
......@@ -108,6 +108,13 @@ sub connect {
$self);
return 1;
}
# Disconnected: Transaction (if any) was aborted.
if (delete $self->{_sdbTransactionLevel}) {
$log->syslog('err', 'Transaction on database %s was aborted: %s',
$self, $DBI::errstr);
$self->set_persistent($self->{_sdbPrevPersistency});
return undef;
}
# Do we have required parameters?
foreach my $param (@{$self->required_parameters}) {
......@@ -406,6 +413,56 @@ sub prepare_query_log_values {
# DEPRECATED: Use tools::eval_in_time() and fetchall_arrayref().
#sub fetch();
sub begin {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
return undef unless $dbh->begin_work;
$self->{_sdbTransactionLevel} //= 0;
unless ($self->{_sdbTransactionLevel}++) {
$self->{_sdbPrevPersistency} = $self->set_persistent(0);
}
return 1;
}
sub _finalize_transaction {
my $self = shift;
unless (defined $self->{_sdbTransactionLevel}) {
return;
}
unless ($self->{_sdbTransactionLevel}) {
die 'bug in logic. Ask developer';
}
unless (--$self->{_sdbTransactionLevel}) {
$self->set_persistent($self->{_sdbPrevPersistency});
}
}
sub commit {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->_finalize_transaction;
return $dbh->commit;
}
sub rollback {
my $self = shift;
my $dbh = $self->__dbh;
return undef unless $dbh;
$self->_finalize_transaction;
return $dbh->rollback;
}
sub disconnect {
my $self = shift;
......@@ -434,12 +491,14 @@ sub set_persistent {
my $self = shift;
my $flag = shift;
my $ret = $persistent_connection_of{$self->get_id};
if ($flag) {
$persistent_connection_of{$self->get_id} = 1;
} elsif (defined $flag) {
delete $persistent_connection_of{$self->get_id};
}
return $self;
# Returns the previous value of the flag (6.2.65b.1 or later)
return $ret;
}
sub ping {
......@@ -534,6 +593,16 @@ TBD.
I<Constructor>.
Creates new database instance.
=item begin ( )
I<Instance method>, I<only for SQL>.
Begin transaction.
=item commit ( )
I<Instance method>, I<only for SQL>.
Commit transaction.
=item do_operation ( $operation, options... )
I<Instance method>, I<only for LDAP>.
......@@ -564,6 +633,11 @@ Returns:
Statement handle (L<DBI::st> object or such), or C<undef>.
=item rollback ( )
I<Instance method>, I<only for SQL>.
Rollback transaction.
=back
=head1 SEE ALSO
......
......@@ -539,6 +539,14 @@ provided by L<Sympa::Database> class:
=over
=item begin ( )
I<Overridable>, I<only for SQL driver>.
=item commit ( )
I<Overridable>, I<only for SQL driver>.
=item do_operation ( $operation, $parameters, ...)
I<Overridable>, I<only for LDAP driver>.
......@@ -551,6 +559,10 @@ I<Overridable>, I<only for SQL driver>.
I<Overridable>, I<only for SQL driver>.
=item rollback ( )
I<Overridable>, I<only for SQL driver>.
=item AS_DOUBLE ( $value )
I<Overridable>.
......
......@@ -520,10 +520,50 @@ sub translate_type {
return $type;
}
# As SQLite does not support nested transactions, these are not effective
# during when {_sdbSQLiteTransactionLevel} attribute is positive, i.e. only
# the outermost transaction will be available.
sub begin {
my $self = shift;
$self->{_sdbSQLiteTransactionLevel} //= 0;
if ($self->{_sdbSQLiteTransactionLevel}++) {
return 1;
}
return $self->SUPER::begin;
}
sub commit {
my $self = shift;
unless ($self->{_sdbSQLiteTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbSQLiteTransactionLevel}) {
return 1;
}
return $self->SUPER::commit;
}
sub rollback {
my $self = shift;
unless ($self->{_sdbSQLiteTransactionLevel}) {
die 'bug in logic. Ask developer';
}
if (--$self->{_sdbSQLiteTransactionLevel}) {
return 1;
}
return $self->SUPER::rollback;
}
# Note:
# To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
# - To prevent "database is locked" error, acquire "immediate" lock
# by each query. Most queries excluding "SELECT" need to lock in this
# manner.
# - If a transaction has been begun, lock is not needed, because SQLite
# does not support nested transactions.
sub do_query {
my $self = shift;
my $sth;
......@@ -531,8 +571,8 @@ sub do_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbSQLiteTransactionLevel};
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......@@ -571,8 +611,8 @@ sub do_prepared_query {
my $need_lock =
($_[0] =~
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i
);
/^\s*(ALTER|CREATE|DELETE|DROP|INSERT|REINDEX|REPLACE|UPDATE)\b/i)
unless $self->{_sdbSQLiteTransactionLevel};
## acquire "immediate" lock
unless (!$need_lock or $self->__dbh->begin_work) {
......
......@@ -1547,6 +1547,7 @@ sub delete_list_member {
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $who (@u) {
next unless defined $who and length $who;
......@@ -1602,9 +1603,15 @@ sub delete_list_member {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at delete member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('member');
return (-1 * $total);
return (-1 * $total);
}
## Delete the indicated admin users from the list.
......@@ -1616,12 +1623,13 @@ sub delete_list_admin {
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $who (@u) {
next unless defined $who and length $who;
$who = Sympa::Tools::Text::canonic_email($who);
my $sdm = Sympa::DatabaseManager->instance;
# Delete record in ADMIN
unless (
$sdm
......@@ -1641,6 +1649,12 @@ sub delete_list_admin {
$total--;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user');
return (-1 * $total);
......@@ -1976,55 +1990,8 @@ sub get_exclusion {
return $data_exclu;
}
sub is_member_excluded {
my $self = shift;
my $email = shift;
return undef unless defined $email and length $email;
$email = Sympa::Tools::Text::canonic_email($email);
my $sdm = Sympa::DatabaseManager->instance;
my $sth;
if (defined $self->{'admin'}{'family_name'}
and length $self->{'admin'}{'family_name'}) {
unless (
$sdm
and $sth = $sdm->do_prepared_query(
q{SELECT COUNT(*)
FROM exclusion_table
WHERE (list_exclusion = ? OR family_exclusion = ?) AND
robot_exclusion = ? AND
user_exclusion = ?},
$self->{'name'}, $self->{'admin'}{'family_name'},
$self->{'domain'},
$email
)
) {
#FIXME: report error
return undef;
}
} else {
unless (
$sdm
and $sth = $sdm->do_prepared_query(
q{SELECT COUNT(*)
FROM exclusion_table
WHERE list_exclusion = ? AND robot_exclusion = ? AND
user_exclusion = ?},
$self->{'name'}, $self->{'domain'},
$email
)
) {
#FIXME: report error
return undef;
}
}
my ($count) = $sth->fetchrow_array;
$sth->finish;
return $count || 0;
}
# DEPRECATED. No longer used.
#sub is_member_excluded;
# Mapping between var and field names.
sub _map_list_member_cols {
......@@ -3220,6 +3187,7 @@ sub add_list_member {
}
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $new_user (@new_users) {
my $who = Sympa::Tools::Text::canonic_email($new_user->{'email'});
......@@ -3259,7 +3227,6 @@ sub add_list_member {
$new_user->{'date'} ||= time;
$new_user->{'update_date'} ||= $new_user->{'date'};
my $custom_attribute;
if (ref $new_user->{'custom_attribute'} eq 'HASH') {
$new_user->{'custom_attribute'} =
Sympa::Tools::Data::encode_custom_attribute(
......@@ -3368,6 +3335,11 @@ sub add_list_member {
$current_list_members_count++;
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add member commit: %s', $sdm->error);
$sdm->rollback;
}
$self->_cache_publish_expiry('member');
$self->_create_add_error_string() if ($self->{'add_outcome'}{'errors'});
return 1;
......@@ -3404,11 +3376,22 @@ sub add_list_admin {
my @users = @_;
my $total = 0;
my $sdm = Sympa::DatabaseManager->instance;
$sdm->begin;
foreach my $user (@users) {
$total++ if $self->_add_list_admin($role, $user);
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at add admin commit: %s', $sdm->error);
$sdm->rollback;
return 0;
}
$self->_cache_publish_expiry('admin_user') if $total;
return $total;
}
......@@ -6577,7 +6560,7 @@ Returns true if the indicated user is member of the list.
=item is_member_excluded ( $email )
I<Instance method>.
FIXME @todo doc
B<Deprecated>.
=item is_moderated ()
......
......@@ -4,8 +4,8 @@
# Sympa - SYsteme de Multi-Postage Automatique
#
# Copyright 2019, 2020 The Sympa Community. See the AUTHORS.md
# file at the top-level directory of this distribution and at
# Copyright 2019, 2020, 2021 The Sympa Community. See the
# AUTHORS.md file at the top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
#
# This program is free software; you can redistribute it and/or modify
......@@ -367,22 +367,152 @@ sub _update_users {
my $start_time = shift;
return unless $ds->open;
my $list = $ds->{context};
my $role = $ds->role;
my $sdm = Sympa::DatabaseManager->instance;
return unless $sdm;
my $sth;
my %result = (added => 0, deleted => 0, updated => 0, kept => 0);
my %users;
my ($t, $r) =
($role eq 'member')
? ('subscriber', '')
: ('admin', sprintf ' AND role_admin = %s', $sdm->quote($role));
return unless $sth = $sdm->do_prepared_query(
qq{
SELECT user_$t, inclusion_$t, inclusion_ext_$t
FROM ${t}_table
WHERE list_$t = ? AND robot_$t = ?$r},
$list->{'name'}, $list->{'domain'}
);
while (my @row = $sth->fetchrow_array) {
my $user_email = Sympa::Tools::Text::canonic_email($row[0]);
my $user_inclusion = $row[1];
my $user_inclusion_ext = $row[2];
$users{$user_email} = {
inclusion => $user_inclusion,
inclusion_ext => $user_inclusion_ext
};
}
my %exclusion_list =
map { (Sympa::Tools::Text::canonic_email($_) => 1) }
@{$list->get_exclusion->{emails}}
if $role eq 'member';
my @to_be_inserted;
$sdm->begin;
while (my $entry = $ds->next) {
my ($email, $other_value) = @$entry;
my %res = __update_user($ds, $email, $other_value, $start_time);
my $email = $entry->[0];
next unless Sympa::Tools::Text::valid_email($email);
$email = Sympa::Tools::Text::canonic_email($email);
# 1. If role of the data source is 'member' and the user is excluded:
# Do nothing.
next if $role eq 'member' and exists $exclusion_list{$email};
# 4. If the user is not a member, i.e. a new user:
# INSERT new user (see below).
unless (exists $users{$email}) {
push @to_be_inserted, $entry;
next;
}
# 2. If user has already been updated by the other data sources:
# Keep user.
my ($inclusion, $inclusion_ext) =
($users{$email}->{inclusion}, $users{$email}->{inclusion_ext});
if ($ds->is_external) {
if ( defined $inclusion
and $start_time <= int($inclusion)
and defined $inclusion_ext
and $start_time <= int($inclusion_ext)) {
$result{kept}++;
next;
}
} else {
if (defined $inclusion and $start_time <= int($inclusion)) {
$result{kept}++;
next;
}
}
# 3. If user (has not been updated by the other data sources and)
# exists:
# UPDATE inclusion.
my %res = __update_user($ds, $email, $start_time);
unless (%res) {
$log->syslog('info', '%s: Aborted update', $ds);
$sdm->rollback;
$ds->close;
$log->syslog('info', '%s: Aborted inclusion', $ds);
return;
}
foreach my $res (keys %res) {
$result{$res} += $res{$res} if exists $result{$res};
}
$result{updated} += $res{updated} if $res{updated};
}
unless ($sdm->commit) {
$log->syslog('err', 'Error at update user commit: %s', $sdm->error);
$sdm->rollback;
$ds->close;
return;
}
my $time = time;
# Avoid retrace of clock e.g. by outage of NTP server.
$time = $start_time unless $start_time <= time;
# INSERT new user with:
# email, gecos, subscribed=0, date, update, inclusion,
# (optional) inclusion_ext, inclusion_label and
# default attributes.
my @list_of_new_users = map {
my ($email, $gecos) = @$_;
my $user = {
email => $email,
gecos => $gecos,
subscribed => 0,
date => $time,
update_date => $time,
inclusion => $time,
($ds->is_external ? (inclusion_ext => $time) : ()),
inclusion_label => $ds->name,
};
my @defkeys = @{$ds->{_defkeys} || []};
my @defvals = @{$ds->{_defvals} || []};
@{$user}{@defkeys} = @defvals if @defkeys;
$result{added}++;
$user;
} @to_be_inserted;
if ($role eq 'member') {
$list->add_list_member(@list_of_new_users);
foreach my $new_user (@list_of_new_users) {
if ($list->{'admin'}{'inclusion_notification_feature'} eq 'on') {
unless (
$list->send_probe_to_user(
'welcome', $new_user->{'email'}
)
) {
$log->syslog('err',
'Unable to send "welcome" probe to %s',
$new_user->{'email'});
}
}