From: Tom Clegg Date: Mon, 29 Feb 2016 21:00:39 +0000 (-0500) Subject: 8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi... X-Git-Tag: 1.1.0~1089^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fbd54468b13466839c24d880a3d041d0a49371af 8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi-hgi/arvados into 8099-babysit-all-srun Conflicts: sdk/cli/bin/crunch-job --- fbd54468b13466839c24d880a3d041d0a49371af diff --cc sdk/cli/bin/crunch-job index b63886e105,c8a1de9c65..ca9db1dacd --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@@ -1130,122 -1181,133 +1131,133 @@@ sub update_progress_stat sub reapchildren { - my $pid = waitpid (-1, WNOHANG); - return 0 if $pid <= 0; - - my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} - . "." - . $slot[$proc{$pid}->{slot}]->{cpu}); - my $jobstepidx = $proc{$pid}->{jobstepidx}; - my $elapsed = time - $proc{$pid}->{time}; - my $Jobstep = $jobstep[$jobstepidx]; - - my $childstatus = $?; - my $exitvalue = $childstatus >> 8; - my $exitinfo = "exit ".exit_status_s($childstatus); - $Jobstep->{'arvados_task'}->reload; - my $task_success = $Jobstep->{'arvados_task'}->{success}; - - Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success"); - - if (!defined $task_success) { - # task did not indicate one way or the other --> fail - Log($jobstepidx, sprintf( - "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", - exit_status_s($childstatus))); - $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; - $task_success = 0; - } - - if (!$task_success) + my $children_reaped = 0; - - while((my $pid = waitpid (-1, WNOHANG)) > 0) ++ while ((my $pid = waitpid (-1, WNOHANG)) > 0) { - my $temporary_fail; - $temporary_fail ||= $Jobstep->{tempfail}; - $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); - - ++$thisround_failed; - ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; - - # Check for signs of a failed or misconfigured node - if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= - 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { - # Don't count this against jobstep failure thresholds if this - # node is already suspected faulty and srun exited quickly - if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && - $elapsed < 5) { - Log ($jobstepidx, "blaming failure on suspect node " . - $slot[$proc{$pid}->{slot}]->{node}->{name}); - $temporary_fail ||= 1; - } - ban_node_by_slot($proc{$pid}->{slot}); + my $childstatus = $?; ++ + my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} + . "." + . $slot[$proc{$pid}->{slot}]->{cpu}); - my $jobstepid = $proc{$pid}->{jobstep}; ++ my $jobstepidx = $proc{$pid}->{jobstepidx}; + + if (!WIFEXITED($childstatus)) + { + # child did not exit (may be temporarily stopped) - Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now."); ++ Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now."); + next; } - Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds', - ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary' : 'permanent', - $elapsed)); + $children_reaped++; + my $elapsed = time - $proc{$pid}->{time}; - my $Jobstep = $jobstep[$jobstepid]; ++ my $Jobstep = $jobstep[$jobstepidx]; + + my $exitvalue = $childstatus >> 8; + my $exitinfo = "exit ".exit_status_s($childstatus); + $Jobstep->{'arvados_task'}->reload; + my $task_success = $Jobstep->{'arvados_task'}->{success}; + - Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success"); ++ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success"); + + if (!defined $task_success) { + # task did not indicate one way or the other --> fail - Log($jobstepid, sprintf( ++ Log($jobstepidx, sprintf( + "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", + exit_status_s($childstatus))); + $Jobstep->{'arvados_task'}->{success} = 0; + $Jobstep->{'arvados_task'}->save; + $task_success = 0; + } - if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { - # Give up on this task, and the whole job - $main::success = 0; + if (!$task_success) + { + my $temporary_fail; + $temporary_fail ||= $Jobstep->{tempfail}; + $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); + + ++$thisround_failed; + ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; + + # Check for signs of a failed or misconfigured node + if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= + 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { + # Don't count this against jobstep failure thresholds if this + # node is already suspected faulty and srun exited quickly + if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && + $elapsed < 5) { - Log ($jobstepid, "blaming failure on suspect node " . ++ Log ($jobstepidx, "blaming failure on suspect node " . + $slot[$proc{$pid}->{slot}]->{node}->{name}); + $temporary_fail ||= 1; + } + ban_node_by_slot($proc{$pid}->{slot}); + } + - Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', - ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary' : 'permanent', - $elapsed)); ++ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds', ++ ++$Jobstep->{'failures'}, ++ $temporary_fail ? 'temporary' : 'permanent', ++ $elapsed)); + + if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { + # Give up on this task, and the whole job + $main::success = 0; + } + # Put this task back on the todo queue - push @jobstep_todo, $jobstepid; ++ push @jobstep_todo, $jobstepidx; + $Job->{'tasks_summary'}->{'failed'}++; } - # Put this task back on the todo queue - push @jobstep_todo, $jobstepidx; - $Job->{'tasks_summary'}->{'failed'}++; - } - else - { - ++$thisround_succeeded; - $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; - push @jobstep_done, $jobstepidx; - Log ($jobstepidx, "success in $elapsed seconds"); - } - $Jobstep->{exitcode} = $childstatus; - $Jobstep->{finishtime} = time; - $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; - process_stderr_final ($jobstepidx); - Log ($jobstepidx, sprintf("task output (%d bytes): %s", - length($Jobstep->{'arvados_task'}->{output}), - $Jobstep->{'arvados_task'}->{output})); - - close $reader{$jobstepidx}; - delete $reader{$jobstepidx}; - delete $slot[$proc{$pid}->{slot}]->{pid}; - push @freeslot, $proc{$pid}->{slot}; - delete $proc{$pid}; - - if ($task_success) { - # Load new tasks - my $newtask_list = []; - my $newtask_results; - do { - $newtask_results = api_call( - "job_tasks/list", - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); - push(@$newtask_list, @{$newtask_results->{items}}); - } while (@{$newtask_results->{items}}); - foreach my $arvados_task (@$newtask_list) { - my $jobstep = { - 'level' => $arvados_task->{'sequence'}, - 'failures' => 0, - 'arvados_task' => $arvados_task - }; - push @jobstep, $jobstep; - push @jobstep_todo, $#jobstep; + else + { + ++$thisround_succeeded; + $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; - push @jobstep_done, $jobstepid; - Log ($jobstepid, "success in $elapsed seconds"); ++ push @jobstep_done, $jobstepidx; ++ Log ($jobstepidx, "success in $elapsed seconds"); } + $Jobstep->{exitcode} = $childstatus; + $Jobstep->{finishtime} = time; + $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); + $Jobstep->{'arvados_task'}->save; - process_stderr ($jobstepid, $task_success); - Log ($jobstepid, sprintf("task output (%d bytes): %s", - length($Jobstep->{'arvados_task'}->{output}), - $Jobstep->{'arvados_task'}->{output})); ++ process_stderr_final ($jobstepidx); ++ Log ($jobstepidx, sprintf("task output (%d bytes): %s", ++ length($Jobstep->{'arvados_task'}->{output}), ++ $Jobstep->{'arvados_task'}->{output})); + - close $reader{$jobstepid}; - delete $reader{$jobstepid}; ++ close $reader{$jobstepidx}; ++ delete $reader{$jobstepidx}; + delete $slot[$proc{$pid}->{slot}]->{pid}; + push @freeslot, $proc{$pid}->{slot}; + delete $proc{$pid}; + + if ($task_success) { + # Load new tasks + my $newtask_list = []; + my $newtask_results; + do { + $newtask_results = api_call( + "job_tasks/list", + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); + push(@$newtask_list, @{$newtask_results->{items}}); + } while (@{$newtask_results->{items}}); + foreach my $arvados_task (@$newtask_list) { + my $jobstep = { + 'level' => $arvados_task->{'sequence'}, + 'failures' => 0, + 'arvados_task' => $arvados_task + }; + push @jobstep, $jobstep; + push @jobstep_todo, $#jobstep; + } + } + $progress_is_dirty = 1; } - $progress_is_dirty = 1; - 1; + return $children_reaped; } sub check_refresh_wanted @@@ -1292,10 -1351,13 +1304,13 @@@ sub check_squeu # squeue check interval (15s) this should make the squeue check an # infrequent event. my $silent_procs = 0; - for my $procinfo (values %proc) - for my $js (map {$jobstep[$_->{jobstep}]} values %proc) ++ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc) { - my $jobstep = $jobstep[$procinfo->{jobstepidx}]; - if ($jobstep->{stderr_at} < $last_squeue_check) + if (!exists($js->{stderr_at})) + { + $js->{stderr_at} = 0; + } + if ($js->{stderr_at} < $last_squeue_check) { $silent_procs++; } @@@ -1305,16 -1367,16 +1320,16 @@@ # use killem() on procs whose killtime is reached while (my ($pid, $procinfo) = each %proc) { - my $jobstep = $jobstep[$procinfo->{jobstepidx}]; - my $js = $jobstep[$procinfo->{jobstep}]; ++ my $js = $jobstep[$procinfo->{jobstepidx}]; if (exists $procinfo->{killtime} && $procinfo->{killtime} <= time - && $jobstep->{stderr_at} < $last_squeue_check) + && $js->{stderr_at} < $last_squeue_check) { my $sincewhen = ""; - if ($jobstep->{stderr_at}) { - $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + if ($js->{stderr_at}) { + $sincewhen = " in last " . (time - $js->{stderr_at}) . "s"; } - Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); + Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); killem ($pid); } } @@@ -1383,32 -1446,30 +1398,42 @@@ sub release_allocatio sub readfrompipes { my $gotsome = 0; + my %fd_job; + my $sel = IO::Select->new(); - foreach my $job (keys %reader) + foreach my $jobstepidx (keys %reader) + { - my $fd = $reader{$job}; ++ my $fd = $reader{$jobstepidx}; + $sel->add($fd); - $fd_job{$fd} = $job; ++ $fd_job{$fd} = $jobstepidx; ++ ++ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) { ++ $sel->add($stdout_fd); ++ $fd_job{$stdout_fd} = $jobstepidx; ++ } + } + # select on all reader fds with 0.1s timeout + my @ready_fds = $sel->can_read(0.1); + foreach my $fd (@ready_fds) { my $buf; - if ($jobstep[$jobstepidx]->{stdout_r} && - 0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) + if (0 < sysread ($fd, $buf, 65536)) { ++ $gotsome = 1; print STDERR $buf if $ENV{CRUNCH_DEBUG}; - if (exists $jobstep[$jobstepidx]->{stdout_captured}) { - my $job = $fd_job{$fd}; - $jobstep[$job]->{stderr_at} = time; - $jobstep[$job]->{stderr} .= $buf; ++ ++ my $jobstepidx = $fd_job{$fd}; ++ if ($jobstep[$jobstepidx]->{stdout_r} == $fd) { + $jobstep[$jobstepidx]->{stdout_captured} .= $buf; ++ next; + } - $gotsome = 1; - } - if (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) - { - print STDERR $buf if $ENV{CRUNCH_DEBUG}; ++ + $jobstep[$jobstepidx]->{stderr_at} = time; + $jobstep[$jobstepidx]->{stderr} .= $buf; - if (exists $jobstep[$jobstepidx]->{stderr_captured}) { - $jobstep[$jobstepidx]->{stderr_captured} .= $buf; - } - $gotsome = 1; # Consume everything up to the last \n - preprocess_stderr ($job); + preprocess_stderr ($jobstepidx); - if (length ($jobstep[$job]->{stderr}) > 16384) + if (length ($jobstep[$jobstepidx]->{stderr}) > 16384) { # If we get a lot of stderr without a newline, chop off the # front to avoid letting our buffer grow indefinitely.