Commit c0b1444a authored by sikeda's avatar sikeda
Browse files

[dev] Refactoring. Managee child PIDs by single point, {children} attribute...

[dev] Refactoring.  Managee child PIDs by single point, {children} attribute of Sympa::Process instance.
By this change, number of children (slave bulks and sendmail processes) will be estimated properly.  And Proc::ProcessTable module may no longer be required.

ToDo: Use of SIGCHLD handler might be considered.


git-svn-id: https://subversion.renater.fr/sympa/branches/sympa-6.2-branch@12511 05aa8bb8-cd2b-0410-b1d7-8918dfa770ce
parent eb00007c
......@@ -165,8 +165,8 @@ sub store {
die sprintf 'Unable to create a SMTP channel: %s', $ERRNO;
# No return
}
$pid = _safefork();
$self->{_pids}->{$pid} = $message->get_id;
$pid = _safefork($message->get_id);
$self->{_pids}->{$pid} = 1;
unless ($pid) { # _safefork() would die if fork() had failed.
# Child
......@@ -248,9 +248,11 @@ sub store {
# Old name: tools::safefork().
# Note: Use store().
sub _safefork {
my $tag = shift;
my $err;
for (my $i = 1; $i < 4; $i++) {
my $pid = $process->fork;
my $pid = $process->fork($tag);
return $pid if defined $pid;
$err = $ERRNO;
......@@ -258,7 +260,8 @@ sub _safefork {
#FIXME:should send a mail to the listmaster
sleep(10 * $i);
}
die sprintf 'Exiting because cannot create new process: %s', $err;
die sprintf 'Exiting because cannot create new process for <%s>: %s',
$tag, $err;
# No return.
}
......
......@@ -31,6 +31,8 @@ use base qw(Class::Singleton);
use English qw(-no_match_vars);
use POSIX qw();
use Sympa::Constants;
use Sympa::LockedFile;
use Sympa::Log;
my $log = Sympa::Log->instance;
......@@ -44,12 +46,13 @@ sub _new_instance {
sub fork {
my $self = shift;
my $tag = shift || [split /\//, $PROGRAM_NAME]->[-1];
my $pid = CORE::fork();
unless (defined $pid) {
;
} elsif ($pid) {
$self->{children}->{$pid} = 1;
$self->{children}->{$pid} = $tag;
} else {
$self->{children} = {};
}
......@@ -61,9 +64,9 @@ sub reap_child {
my $self = shift;
my %options = @_;
my $blocking = $options{blocking};
my $pid;
my $blocking = $options{blocking};
while (0 < ($pid = waitpid(-1, $blocking ? 0 : POSIX::WNOHANG()))) {
$blocking = 0;
unless (exists $self->{children}->{$pid}) {
......@@ -74,15 +77,14 @@ sub reap_child {
if ($CHILD_ERROR & 127) {
$log->syslog(
'err',
'Child process %s for message <%s> was terminated by signal %d',
'Child process %s for <%s> was terminated by signal %d',
$pid,
$self->{children}->{$pid},
$CHILD_ERROR & 127
);
} elsif ($CHILD_ERROR) {
$log->syslog(
'err',
'Child process %s for message <%s> exited with status %s',
'err', 'Child process %s for <%s> exited with status %s',
$pid,
$self->{children}->{$pid},
$CHILD_ERROR >> 8
......@@ -105,9 +107,49 @@ sub reap_child {
}
}
if ($options{pidname}) {
my $pidname = $options{pidname};
foreach my $child_pid (_get_pids_in_pid_file($pidname)) {
next if $child_pid == $PID;
unless (exists $self->{children}->{$child_pid}) {
$log->syslog(
'err',
'The %s child exists in the PID file but is no longer running. Removing it and notifying listmaster',
$child_pid
);
Sympa::Tools::Daemon::remove_pid($pidname, $child_pid,
{multiple_process => 1});
Sympa::Tools::Daemon::send_crash_report(
pid => $child_pid,
pname => [split /\//, $PROGRAM_NAME]->[-1]
);
}
}
}
return $pid;
}
# Old name: Sympa::Tools::Daemon::get_pids_in_pid_file().
sub _get_pids_in_pid_file {
my $name = shift;
my $piddir = Sympa::Constants::PIDDIR;
my $pidfile = $piddir . '/' . $name . '.pid';
my $lock_fh = Sympa::LockedFile->new($pidfile, 5, '<');
unless ($lock_fh) {
$log->syslog('err', 'Unable to open PID file %s: %m', $pidfile);
return;
}
my $l = <$lock_fh>;
my @pids = grep {/^[0-9]+$/} split(/\s+/, $l);
$lock_fh->close;
return @pids;
}
1;
__END__
......@@ -143,13 +185,29 @@ Returns:
A new L<Sympa::Process> instance, or I<undef> for failure.
=item fork ( )
=item fork ( [ $tag ] )
I<Instance method>.
Forks process.
Note that this method should be used instead of fork() in Perl core.
=item reap_child ( [ blocking =E<gt> 1 ], [ children =E<gt> \%children ] )
Parameter:
=over
=item $tag
A string to determine new child process.
By default the name of calling process.
=back
Returns:
See L<perlfunc/"fork">.
=item reap_child ( [ blocking =E<gt> 1 ], [ children =E<gt> \%children ],
pidname =E<gt> $name ] )
I<Instance method>.
Non blocking function called by: main loop of sympa, task_manager, bounced
......@@ -166,7 +224,12 @@ Operation would block.
=item children =E<gt> \%children
Syncs local map of child PIDs.
Syncs PIDs in local map %children.
=item pidname =E<gt> $name
Syncs child PIDs in PID file named $name.pid.
If dead PID is found, notification will be send to listmaster.
=back
......
......@@ -42,12 +42,15 @@ use Sympa::List;
use Sympa::Log;
use Sympa::Mailer;
use Sympa::Message;
use Sympa::Process;
use tools;
use Sympa::Tools::Daemon;
use Sympa::Tools::Data;
use Sympa::Tools::DKIM;
use Sympa::Tracking;
my $process = Sympa::Process->instance;
my $date_of_last_activity = time();
local $main::daemon_usage =
'DAEMON_MASTER'; ## Default is to launch bulk as master daemon.
......@@ -193,7 +196,7 @@ while (!$end) {
$log->{level} = $default_log_level;
# Free zombie sendmail process.
Sympa::Mailer->instance->reaper;
Sympa::Process->instance->reap_child;
Sympa::List::init_list_cache();
# Process grouped notifications
......@@ -202,65 +205,26 @@ while (!$end) {
my ($message, $handle);
unless ($main::options{'foreground'}) {
##
## Create child bulks if too much packets are waiting to be sent in
## the bulk_mailer table.
## Only the main bulk process (DAEMON_MASTER) can create child
## processes
## Check if we need to run new child processes every
## 'bulk_wait_to_fork' (sympa.conf parameter) seconds
if ( ($main::daemon_usage eq 'DAEMON_MASTER')
&& (time() - $last_check_date > $timeout)) {
## Clean up PID file (in case some child bulks would have died)
my @actual_children =
Sympa::Tools::Daemon::get_children_processes_list();
my @remaining_children;
my %dead_children = %created_children;
foreach my $apid (@actual_children) {
if (defined $dead_children{$apid}) {
push @remaining_children, $apid;
delete $dead_children{$apid};
}
}
my @pids_leftover = keys %dead_children;
if ($#pids_leftover > -1) {
my @dc;
$log->syslog('debug2',
'Some childs of current process disappeared. Checking whether they shut down cleanly or not.'
);
my $pids_in_pfile =
Sympa::Tools::Daemon::get_pids_in_pid_file('bulk');
foreach my $fpid (@{$pids_in_pfile}) {
if (defined $dead_children{$fpid}) {
$log->syslog(
'err',
'The %s child exists in the PID file but is no longer running. Removing it and notyfying listmaster',
$fpid
);
my $pname = $0;
$pname =~ s/.*\/(\w+)/$1/;
Sympa::Tools::Daemon::send_crash_report(
('pid' => $fpid, 'pname' => $pname));
Sympa::Tools::Daemon::remove_pid('bulk', $fpid,
$options);
delete $dead_children{$fpid};
push @dc, $fpid;
}
}
}
my $pids_in_pfile =
Sympa::Tools::Daemon::get_pids_in_pid_file('bulk');
unless (defined($pids_in_pfile)) {
Sympa::Tools::Daemon::write_pid('bulk', $PID, $options);
close STDERR;
Sympa::Tools::Daemon::direct_stderr_to_file(('pid' => $PID));
$pids_in_pfile = [$PID];
}
# Create child bulks if too much packets are waiting to be sent in
# the bulk_mailer table.
# Only the main bulk process (DAEMON_MASTER) can create child
# processes.
# Check if we need to run new child processes every
# 'bulk_wait_to_fork' (sympa.conf parameter) seconds.
if ( $main::daemon_usage eq 'DAEMON_MASTER'
and $timeout < time - $last_check_date) {
# Clean up PID file (in case some child bulks would have died)
$process->reap_child(
children => \%created_children,
pidname => 'bulk'
);
# Start new processes if there remain at least
# 'bulk_fork_threshold' packets to send in the bulk spool.
if (my $r_packets = $bulk->too_much_remaining_packets()
and scalar(@remaining_children) <
$Conf::Conf{'bulk_max_count'}) {
my $spare_children =
$Conf::Conf{'bulk_max_count'} - scalar keys %created_children;
if (my $r_packets = $bulk->too_much_remaining_packets
and 0 < $spare_children) {
# Disconnect from database before fork to prevent DB handles
# to be shared by different processes. Sharing database
# handles may crash bulk.pl.
......@@ -271,14 +235,11 @@ while (!$end) {
'info',
'Important workload: %s packets to process. Creating %s child bulks to increase sending rate',
$r_packets,
$Conf::Conf{'bulk_max_count'} -
scalar(@remaining_children)
$spare_children
);
for my $process_count (
1 .. $Conf::Conf{'bulk_max_count'} -
scalar(@remaining_children)) {
for my $process_count (1 .. $spare_children) {
$log->syslog('info', "Will fork: %s", $process_count);
my $child_pid = fork;
my $child_pid = $process->fork;
if ($child_pid) {
$log->syslog('info',
'Starting bulk child daemon, PID %s',
......@@ -588,7 +549,7 @@ while (!$end) {
Sympa::Alarm->instance->flush(purge => 1);
# Free zombie sendmail process.
Sympa::Mailer->instance->reaper;
Sympa::Process->instance->reap_child;
$log->syslog('notice', 'Bulk exited normally due to signal');
Sympa::Tools::Daemon::remove_pid('bulk', $PID, $options);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment