-#!/usr/bin/perl
+#!/usr/bin/env perl
# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
=head1 NAME
use constant TASK_TEMPFAIL => 111;
use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
$ENV{"TMPDIR"} ||= "/tmp";
unless (defined $ENV{"CRUNCH_TMP"}) {
$ENV{ARVADOS_API_TOKEN} = $job_api_token;
}
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
+my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
$SIG{'USR1'} = sub
{
Log (undef, "node $nodename - $ncpus slots");
my $node = { name => $nodename,
- ncpus => $ncpus,
- losing_streak => 0,
- hold_until => 0 };
+ ncpus => $ncpus,
+ # The number of consecutive times a task has been dispatched
+ # to this node and failed.
+ losing_streak => 0,
+ # The number of consecutive times that SLURM has reported
+ # a node failure since the last successful task.
+ fail_count => 0,
+ # Don't dispatch work to this node until this time
+ # (in seconds since the epoch) has passed.
+ hold_until => 0 };
foreach my $cpu (1..$ncpus)
{
push @slot, { node => $node,
my @jobstep_done = ();
my @jobstep_tomerge = ();
my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
+my $squeue_checked = 0;
my $latest_refresh = scalar time;
my $thisround_succeeded = 0;
my $thisround_failed = 0;
my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
or $a <=> $b } @jobstep_todo;
$Jobstep->{slotindex} = $childslot;
delete $Jobstep->{stderr};
delete $Jobstep->{finishtime};
+ delete $Jobstep->{tempfail};
$Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
$Jobstep->{'arvados_task'}->save;
{
update_progress_stats();
}
+ $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
+ $_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
{
}
# give up if no nodes are succeeding
- if (!grep { $_->{node}->{losing_streak} == 0 &&
- $_->{node}->{hold_count} < 4 } @slot) {
- my $message = "Every node has failed -- giving up on this round";
- Log (undef, $message);
+ if ($working_slot_count < 1) {
+ Log(undef, "Every node has failed -- giving up");
last THISROUND;
}
}
if (!defined $main::success)
{
- if (@jobstep_todo &&
- $thisround_succeeded == 0 &&
- ($thisround_failed == 0 || $thisround_failed > 4))
- {
+ if (!@jobstep_todo) {
+ $main::success = 1;
+ } elsif ($working_slot_count < 1) {
+ save_output_collection();
+ save_meta();
+ exit(EX_RETRY_UNLOCKED);
+ } elsif ($thisround_succeeded == 0 &&
+ ($thisround_failed == 0 || $thisround_failed > 4)) {
my $message = "stop because $thisround_failed tasks failed and none succeeded";
Log (undef, $message);
$main::success = 0;
}
- if (!@jobstep_todo)
- {
- $main::success = 1;
- }
}
goto ONELEVEL if !defined $main::success;
release_allocation();
freeze();
-my $collated_output = &create_output_collection();
-
-if (!$collated_output) {
- Log (undef, "Failed to write output collection");
-}
-else {
- Log(undef, "job output $collated_output");
- $Job->update_attributes('output' => $collated_output);
-}
-
+my $collated_output = save_output_collection();
Log (undef, "finish");
save_meta();
if (!$task_success)
{
my $temporary_fail;
- $temporary_fail ||= $Jobstep->{node_fail};
+ $temporary_fail ||= $Jobstep->{tempfail};
$temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
++$thisround_failed;
++$thisround_succeeded;
$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
push @jobstep_done, $jobstepid;
Log ($jobstepid, "success in $elapsed seconds");
}
sub check_squeue
{
- # return if the kill list was checked <4 seconds ago
- if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
- {
- return;
- }
- $squeue_kill_checked = time;
+ my $last_squeue_check = $squeue_checked;
- # use killem() on procs whose killtime is reached
- for (keys %proc)
+ # Do not call `squeue` or check the kill list more than once every
+ # 15 seconds.
+ return if $last_squeue_check > time - 15;
+ $squeue_checked = time;
+
+ # Look for children from which we haven't received stderr data since
+ # the last squeue check. If no such children exist, all procs are
+ # alive and there's no need to even look at squeue.
+ #
+ # As long as the crunchstat poll interval (10s) is shorter than the
+ # squeue check interval (15s) this should make the squeue check an
+ # infrequent event.
+ my $silent_procs = 0;
+ for my $jobstep (values %proc)
{
- if (exists $proc{$_}->{killtime}
- && $proc{$_}->{killtime} <= time)
+ if ($jobstep->{stderr_at} < $last_squeue_check)
{
- killem ($_);
+ $silent_procs++;
}
}
+ return if $silent_procs == 0;
- # return if the squeue was checked <60 seconds ago
- if (defined $squeue_checked && $squeue_checked > time - 60)
+ # use killem() on procs whose killtime is reached
+ while (my ($pid, $jobstep) = each %proc)
{
- return;
+ if (exists $jobstep->{killtime}
+ && $jobstep->{killtime} <= time
+ && $jobstep->{stderr_at} < $last_squeue_check)
+ {
+ my $sincewhen = "";
+ if ($jobstep->{stderr_at}) {
+ $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+ }
+ Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ killem ($pid);
+ }
}
- $squeue_checked = time;
if (!$have_slurm)
{
return;
}
- # get a list of steps still running
- my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
- chop @squeue;
- if ($squeue[-1] ne "ok")
+ # Get a list of steps still running. Note: squeue(1) says --steps
+ # selects a format (which we override anyway) and allows us to
+ # specify which steps we're interested in (which we don't).
+ # Importantly, it also changes the meaning of %j from "job name" to
+ # "step name" and (although this isn't mentioned explicitly in the
+ # docs) switches from "one line per job" mode to "one line per step"
+ # mode. Without it, we'd just get a list of one job, instead of a
+ # list of N steps.
+ my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
+ if ($? != 0)
{
+ Log(undef, "warning: squeue exit status $? ($!)");
return;
}
- pop @squeue;
+ chop @squeue;
# which of my jobsteps are running, according to squeue?
my %ok;
- foreach (@squeue)
+ for my $jobstepname (@squeue)
{
- if (/^(\d+)\.(\d+) (\S+)/)
- {
- if ($1 eq $ENV{SLURM_JOBID})
- {
- $ok{$3} = 1;
- }
- }
+ $ok{$jobstepname} = 1;
}
- # which of my active child procs (>60s old) were not mentioned by squeue?
- foreach (keys %proc)
+ # Check for child procs >60s old and not mentioned by squeue.
+ while (my ($pid, $jobstep) = each %proc)
{
- if ($proc{$_}->{time} < time - 60
- && !exists $ok{$proc{$_}->{jobstepname}}
- && !exists $proc{$_}->{killtime})
+ if ($jobstep->{time} < time - 60
+ && $jobstep->{jobstepname}
+ && !exists $ok{$jobstep->{jobstepname}}
+ && !exists $jobstep->{killtime})
{
- # kill this proc if it hasn't exited in 30 seconds
- $proc{$_}->{killtime} = time + 30;
+ # According to slurm, this task has ended (successfully or not)
+ # -- but our srun child hasn't exited. First we must wait (30
+ # seconds) in case this is just a race between communication
+ # channels. Then, if our srun child process still hasn't
+ # terminated, we'll conclude some slurm communication
+ # error/delay has caused the task to die without notifying srun,
+ # and we'll kill srun ourselves.
+ $jobstep->{killtime} = time + 30;
+ Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
if ($have_slurm)
{
Log (undef, "release job allocation");
- system "scancel $ENV{SLURM_JOBID}";
+ system "scancel $ENV{SLURM_JOB_ID}";
}
}
while (0 < sysread ($reader{$job}, $buf, 8192))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
+ $jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
preprocess_stderr ($job);
if (length ($jobstep[$job]->{stderr}) > 16384)
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
- $jobstep[$job]->{node_fail} = 1;
+ elsif ($line =~ /srun: error: Node failure on/) {
+ my $job_slot_index = $jobstep[$job]->{slotindex};
+ $slot[$job_slot_index]->{node}->{fail_count}++;
+ $jobstep[$job]->{tempfail} = 1;
+ ban_node_by_slot($job_slot_index);
+ }
+ elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+ $jobstep[$job]->{tempfail} = 1;
ban_node_by_slot($jobstep[$job]->{slotindex});
}
+ elsif ($line =~ /arvados\.errors\.Keep/) {
+ $jobstep[$job]->{tempfail} = 1;
+ }
}
}
return $joboutput;
}
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+ my $collated_output = create_output_collection();
+
+ if (!$collated_output) {
+ Log(undef, "Failed to write output collection");
+ }
+ else {
+ Log(undef, "job output $collated_output");
+ $Job->update_attributes('output' => $collated_output);
+ }
+ return $collated_output;
+}
sub killem
{
# Send log output to Keep via arv-put.
#
# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
# $log_pipe_pid is the pid of the arv-put subprocess.
#
# The only functions that should access these variables directly are:
# Starts an arv-put pipe, reading data on stdin and writing it to
# a $logfilename file in an output collection.
#
+# log_writer_read_output([$timeout])
+# Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+# Passes $timeout to the select() call, with a default of 0.01.
+# Returns the result of the last read() call on $log_pipe_out, or
+# -1 if read() wasn't called because select() timed out.
+# Only other log_writer_* functions should need to call this.
+#
# log_writer_send($txt)
# Writes $txt to the output log collection.
#
# Returns a true value if there is currently a live arv-put
# process, false otherwise.
#
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+ $log_pipe_pid);
sub log_writer_start($)
{
my $logfilename = shift;
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
'arv-put',
- '--portable-data-hash',
- '--project-uuid', $Job->{owner_uuid},
+ '--stream',
'--retries', '3',
- '--name', $logfilename,
'--filename', $logfilename,
'-');
+ $log_pipe_out_buf = "";
+ $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+ my $timeout = shift || 0.01;
+ my $read = -1;
+ while ($read && $log_pipe_out_select->can_read($timeout)) {
+ $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+ length($log_pipe_out_buf));
+ }
+ if (!defined($read)) {
+ Log(undef, "error reading log manifest from arv-put: $!");
+ }
+ return $read;
}
sub log_writer_send($)
{
my $txt = shift;
print $log_pipe_in $txt;
+ log_writer_read_output();
}
sub log_writer_finish()
return unless $log_pipe_pid;
close($log_pipe_in);
- my $arv_put_output;
- my $s = IO::Select->new($log_pipe_out);
- if ($s->can_read(120)) {
- sysread($log_pipe_out, $arv_put_output, 1024);
- chomp($arv_put_output);
- } else {
+ my $read_result = log_writer_read_output(120);
+ if ($read_result == -1) {
Log (undef, "timed out reading from 'arv-put'");
+ } elsif ($read_result != 0) {
+ Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
- $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
if ($?) {
- Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+ Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
+ close($log_pipe_out);
+ my $arv_put_output = $log_pipe_out_buf;
+ $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+ $log_pipe_out_select = undef;
+
return $arv_put_output;
}
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
- my $loglocator = log_writer_finish();
- Log (undef, "log manifest is $loglocator");
- $Job->{'log'} = $loglocator;
- $Job->update_attributes('log', $loglocator);
+ my $log_manifest = "";
+ if ($Job->{log}) {
+ my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+ $log_manifest .= $prev_log_coll->{manifest_text};
+ }
+ $log_manifest .= log_writer_finish();
+
+ my $log_coll = api_call(
+ "collections/create", ensure_unique_name => 1, collection => {
+ manifest_text => $log_manifest,
+ owner_uuid => $Job->{owner_uuid},
+ name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+ });
+ Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+ $Job->update_attributes('log' => $log_coll->{portable_data_hash});
}
}
__DATA__
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# This is crunch-job's internal dispatch script. crunch-job running on the API
# server invokes this script on individual compute nodes, or localhost if we're
}
my $python_dir = "$install_dir/python";
-if ((-d $python_dir) and can_run("python2.7") and
- (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
- # egg_info failed, probably when it asked git for a build tag.
- # Specify no build tag.
- open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
- print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
- close($pysdk_cfg);
+if ((-d $python_dir) and can_run("python2.7")) {
+ open(my $egg_info_pipe, "-|",
+ "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
+ my @egg_info_errors = <$egg_info_pipe>;
+ close($egg_info_pipe);
+ if ($?) {
+ if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
+ # egg_info apparently failed because it couldn't ask git for a build tag.
+ # Specify no build tag.
+ open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+ print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+ close($pysdk_cfg);
+ } else {
+ my $egg_info_exit = $? >> 8;
+ foreach my $errline (@egg_info_errors) {
+ print STDERR_ORIG $errline;
+ }
+ warn "python setup.py egg_info failed: exit $egg_info_exit";
+ exit ($egg_info_exit || 1);
+ }
+ }
}
# Hide messages from the install script (unless it fails: shell_or_die