@freeslot = (0..$#slot);
}
my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
my %round_max_slots = ();
for (my $ii = $#freeslot; $ii >= 0; $ii--) {
sub reapchildren
{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepidx = $proc{$pid}->{jobstepidx};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepidx];
-
- my $childstatus = $?;
- my $exitvalue = $childstatus >> 8;
- my $exitinfo = "exit ".exit_status_s($childstatus);
- $Jobstep->{'arvados_task'}->reload;
- my $task_success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
-
- if (!defined $task_success) {
- # task did not indicate one way or the other --> fail
- Log($jobstepidx, sprintf(
- "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
- exit_status_s($childstatus)));
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $task_success = 0;
- }
-
- if (!$task_success)
+ my $children_reaped = 0;
+ while ((my $pid = waitpid (-1, WNOHANG)) > 0)
{
- my $temporary_fail;
- $temporary_fail ||= $Jobstep->{tempfail};
- $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5) {
- Log ($jobstepidx, "blaming failure on suspect node " .
- $slot[$proc{$pid}->{slot}]->{node}->{name});
- $temporary_fail ||= 1;
- }
- ban_node_by_slot($proc{$pid}->{slot});
+ my $childstatus = $?;
+
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
+ my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+ if (!WIFEXITED($childstatus))
+ {
+ # child did not exit (may be temporarily stopped)
+ Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
+ next;
}
- Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
+ $children_reaped++;
+ my $elapsed = time - $proc{$pid}->{time};
+ my $Jobstep = $jobstep[$jobstepidx];
- if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
- # Give up on this task, and the whole job
- $main::success = 0;
+ my $exitvalue = $childstatus >> 8;
+ my $exitinfo = "exit ".exit_status_s($childstatus);
+ $Jobstep->{'arvados_task'}->reload;
+ my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+ if (!defined $task_success) {
+ # task did not indicate one way or the other --> fail
+ Log($jobstepidx, sprintf(
+ "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $task_success = 0;
}
- # Put this task back on the todo queue
- push @jobstep_todo, $jobstepidx;
- $Job->{'tasks_summary'}->{'failed'}++;
- }
- else
- {
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepidx;
- Log ($jobstepidx, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $childstatus;
- $Jobstep->{finishtime} = time;
- $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr_final ($jobstepidx);
- Log ($jobstepidx, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
- close $reader{$jobstepidx};
- delete $reader{$jobstepidx};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
-
- if ($task_success) {
- # Load new tasks
- my $newtask_list = [];
- my $newtask_results;
- do {
- $newtask_results = api_call(
- "job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- push(@$newtask_list, @{$newtask_results->{items}});
- } while (@{$newtask_results->{items}});
- foreach my $arvados_task (@$newtask_list) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'failures' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
+ if (!$task_success)
+ {
+ my $temporary_fail;
+ $temporary_fail ||= $Jobstep->{tempfail};
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5) {
+ Log ($jobstepidx, "blaming failure on suspect node " .
+ $slot[$proc{$pid}->{slot}]->{node}->{name});
+ $temporary_fail ||= 1;
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
+ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+ ++$Jobstep->{'failures'},
+ $temporary_fail ? 'temporary' : 'permanent',
+ $elapsed));
+
+ if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+ # Give up on this task, and the whole job
+ $main::success = 0;
+ }
+ # Put this task back on the todo queue
+ push @jobstep_todo, $jobstepidx;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
+ else
+ {
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+ push @jobstep_done, $jobstepidx;
+ Log ($jobstepidx, "success in $elapsed seconds");
+ }
+ $Jobstep->{exitcode} = $childstatus;
+ $Jobstep->{finishtime} = time;
+ $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+ $Jobstep->{'arvados_task'}->save;
+ process_stderr_final ($jobstepidx);
+ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
+
+ close $reader{$jobstepidx};
+ delete $reader{$jobstepidx};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
+
+ if ($task_success) {
+ # Load new tasks
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ foreach my $arvados_task (@$newtask_list) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'failures' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+ }
+ $progress_is_dirty = 1;
}
- $progress_is_dirty = 1;
- 1;
+ return $children_reaped;
}
sub check_refresh_wanted
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $procinfo (values %proc)
+ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstepidx}];
- if ($jobstep->{stderr_at} < $last_squeue_check)
+ if (!exists($js->{stderr_at}))
+ {
+ $js->{stderr_at} = 0;
+ }
+ if ($js->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
}
# use killem() on procs whose killtime is reached
while (my ($pid, $procinfo) = each %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstepidx}];
+ my $js = $jobstep[$procinfo->{jobstepidx}];
if (exists $procinfo->{killtime}
&& $procinfo->{killtime} <= time
- && $jobstep->{stderr_at} < $last_squeue_check)
+ && $js->{stderr_at} < $last_squeue_check)
{
my $sincewhen = "";
- if ($jobstep->{stderr_at}) {
- $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+ if ($js->{stderr_at}) {
+ $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
}
Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
sub readfrompipes
{
my $gotsome = 0;
+ my %fd_job;
+ my $sel = IO::Select->new();
foreach my $jobstepidx (keys %reader)
+ {
+ my $fd = $reader{$jobstepidx};
+ $sel->add($fd);
+ $fd_job{$fd} = $jobstepidx;
+
+ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+ $sel->add($stdout_fd);
+ $fd_job{$stdout_fd} = $jobstepidx;
+ }
+ }
+ # select on all reader fds with 0.1s timeout
+ my @ready_fds = $sel->can_read(0.1);
+ foreach my $fd (@ready_fds)
{
my $buf;
- if ($jobstep[$jobstepidx]->{stdout_r} &&
- 0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
+ if (0 < sysread ($fd, $buf, 65536))
{
+ $gotsome = 1;
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
+
+ my $jobstepidx = $fd_job{$fd};
+ if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
$jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ next;
}
- $gotsome = 1;
- }
- if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
- {
- print STDERR $buf if $ENV{CRUNCH_DEBUG};
+
$jobstep[$jobstepidx]->{stderr_at} = time;
$jobstep[$jobstepidx]->{stderr} .= $buf;
- if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
- $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
- }
- $gotsome = 1;
# Consume everything up to the last \n
preprocess_stderr ($jobstepidx);