X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b976f74c6ef339eba1695ba38a5e52845c7d201f..eff9d0deac0e06f7310ba1463aab89b342e71aa9:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 20b65af998..655bd15242 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -58,7 +58,8 @@ Save a checkpoint and continue. =item SIGHUP Refresh node allocation (i.e., check whether any nodes have been added -or unallocated). Currently this is a no-op. +or unallocated) and attributes of the Job record that should affect +behavior (e.g., cancel job if cancelled_at becomes non-nil). =back @@ -107,10 +108,6 @@ my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/; my $local_job = !$job_has_uuid; -$SIG{'HUP'} = sub -{ - 1; -}; $SIG{'USR1'} = sub { $main::ENV{CRUNCH_DEBUG} = 1; @@ -122,10 +119,8 @@ $SIG{'USR2'} = sub -my $arv = Arvados->new; -my $metastream = Warehouse::Stream->new(whc => new Warehouse); -$metastream->clear; -$metastream->write_start('log.txt'); +my $arv = Arvados->new('apiVersion' => 'v1'); +my $metastream; my $User = $arv->{'users'}->{'current'}->execute; @@ -170,6 +165,10 @@ else } $job_id = $Job->{'uuid'}; +$metastream = Warehouse::Stream->new(whc => new Warehouse); +$metastream->clear; +$metastream->name('.'); +$metastream->write_start($job_id . '.log.txt'); $Job->{'runtime_constraints'} ||= {}; @@ -257,20 +256,17 @@ my $jobmanager_id; if ($job_has_uuid) { # Claim this job, and make sure nobody else does - - $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; - $Job->{'started_at'} = gmtime; - $Job->{'running'} = 1; - $Job->{'success'} = undef; - $Job->{'tasks_summary'} = { 'failed' => 0, - 'todo' => 1, - 'running' => 0, - 'done' => 0 }; - if ($job_has_uuid) { - unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) { - croak("Error while updating / locking job"); - } + unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) && + $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) { + croak("Error while updating / locking job"); } + $Job->update_attributes('started_at' => scalar gmtime, + 'running' => 1, + 'success' => undef, + 'tasks_summary' => { 'failed' => 0, + 'todo' => 1, + 'running' => 0, + 'done' => 0 }); } @@ -281,9 +277,12 @@ $SIG{'TERM'} = \&croak; $SIG{'TSTP'} = sub { $main::please_freeze = 1; }; $SIG{'ALRM'} = sub { $main::please_info = 1; }; $SIG{'CONT'} = sub { $main::please_continue = 1; }; +$SIG{'HUP'} = sub { $main::please_refresh = 1; }; + $main::please_freeze = 0; $main::please_info = 0; $main::please_continue = 0; +$main::please_refresh = 0; my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); @@ -299,6 +298,7 @@ my $jobstep_tomerge_level = 0; my $squeue_checked; my $squeue_kill_checked; my $output_in_keep = 0; +my $latest_refresh = scalar time; @@ -315,7 +315,7 @@ else 'parameters' => {}, }); push @jobstep, { 'level' => 0, - 'attempts' => 0, + 'failures' => 0, 'arvados_task' => $first_task, }; push @jobstep_todo, 0; @@ -341,13 +341,13 @@ else Log (undef, "Install revision ".$Job->{script_version}); my $nodelist = join(",", @node); - # Clean out crunch_tmp/work and crunch_tmp/opt + # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* my $cleanpid = fork(); if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']); + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -421,7 +421,9 @@ else Log (undef, "Using commit $commit for tree-ish $treeish"); if ($commit ne $treeish) { $Job->{'script_version'} = $commit; - !$job_has_uuid or $Job->save() or croak("Error while updating job"); + !$job_has_uuid or + $Job->update_attributes('script_version' => $commit) or + croak("Error while updating job"); } } } @@ -467,7 +469,7 @@ foreach (split (/\n/, $Job->{knobs})) -my $success; +$main::success = undef; @@ -504,12 +506,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { next; } - if ($Jobstep->{attempts} > 2) - { - Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up"); - $success = 0; - last THISROUND; - } pipe $reader{$id}, "writer" or croak ($!); my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); @@ -564,7 +560,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my @execargs = qw(sh); my $build_script_to_send = ""; my $command = - "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} " + "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " + ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} " ."&& cd $ENV{CRUNCH_TMP} "; if ($build_script) { @@ -579,7 +576,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $build_script_to_send); - exit (1); + exit (111); } close("writer"); if (!defined $childpid) @@ -599,7 +596,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'}); Log ($id, "child $childpid started on $childslotname"); - $Jobstep->{attempts} ++; $Jobstep->{starttime} = time; $Jobstep->{node} = $childnode->{name}; $Jobstep->{slotindex} = $childslot; @@ -629,6 +625,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) + reapchildren (); if (!$gotsome) { + check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); @@ -685,6 +682,7 @@ while (%proc) readfrompipes (); if (!reapchildren()) { + check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); @@ -696,7 +694,7 @@ update_progress_stats(); freeze_if_want_freeze(); -if (!defined $success) +if (!defined $main::success) { if (@jobstep_todo && $thisround_succeeded == 0 && @@ -704,25 +702,25 @@ if (!defined $success) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); - $success = 0; + $main::success = 0; } if (!@jobstep_todo) { - $success = 1; + $main::success = 1; } } -goto ONELEVEL if !defined $success; +goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -$Job->reload; -$Job->{'output'} = &collate_output(); -$Job->{'running'} = 0; -$Job->{'success'} = $Job->{'output'} && $success; -$Job->{'finished_at'} = gmtime; -$Job->save if $job_has_uuid; +if ($job_has_uuid) { + $Job->update_attributes('output' => &collate_output(), + 'running' => 0, + 'success' => $Job->{'output'} && $main::success, + 'finished_at' => scalar gmtime) +} if ($Job->{'output'}) { @@ -756,7 +754,9 @@ sub update_progress_stats $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; $Job->{'tasks_summary'}->{'running'} = $running; - $Job->save if $job_has_uuid; + if ($job_has_uuid) { + $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'}); + } Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } @@ -775,27 +775,32 @@ sub reapchildren my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepid]; - my $exitcode = $?; - my $exitinfo = "exit $exitcode"; + my $childstatus = $?; + my $exitvalue = $childstatus >> 8; + my $exitinfo = sprintf("exit %d signal %d%s", + $exitvalue, + $childstatus & 127, + ($childstatus & 128 ? ' core dump' : '')); $Jobstep->{'arvados_task'}->reload; - my $success = $Jobstep->{'arvados_task'}->{success}; + my $task_success = $Jobstep->{'arvados_task'}->{success}; - Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success"); + Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success"); - if (!defined $success) { + if (!defined $task_success) { # task did not indicate one way or the other --> fail $Jobstep->{'arvados_task'}->{success} = 0; $Jobstep->{'arvados_task'}->save; - $success = 0; + $task_success = 0; } - if (!$success) + if (!$task_success) { - my $no_incr_attempts; - $no_incr_attempts = 1 if $Jobstep->{node_fail}; + my $temporary_fail; + $temporary_fail ||= $Jobstep->{node_fail}; + $temporary_fail ||= ($exitvalue == 111); ++$thisround_failed; - ++$thisround_failed_multiple if $Jobstep->{attempts} > 1; + ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; # Check for signs of a failed or misconfigured node if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= @@ -803,18 +808,28 @@ sub reapchildren # Don't count this against jobstep failure thresholds if this # node is already suspected faulty and srun exited quickly if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && - $elapsed < 5 && - $Jobstep->{attempts} > 1) { - Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts"); - $no_incr_attempts = 1; + $elapsed < 5) { + Log ($jobstepid, "blaming failure on suspect node " . + $slot[$proc{$pid}->{slot}]->{node}->{name}); + $temporary_fail ||= 1; } ban_node_by_slot($proc{$pid}->{slot}); } - push @jobstep_todo, $jobstepid; - Log ($jobstepid, "failure in $elapsed seconds"); + Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', + ++$Jobstep->{'failures'}, + $temporary_fail ? 'temporary ' : 'permanent', + $elapsed)); - --$Jobstep->{attempts} if $no_incr_attempts; + if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { + # Give up on this task, and the whole job + $main::success = 0; + $main::please_freeze = 1; + } + else { + # Put this task back on the todo queue + push @jobstep_todo, $jobstepid; + } $Job->{'tasks_summary'}->{'failed'}++; } else @@ -825,9 +840,9 @@ sub reapchildren push @jobstep_done, $jobstepid; Log ($jobstepid, "success in $elapsed seconds"); } - $Jobstep->{exitcode} = $exitcode; + $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; - process_stderr ($jobstepid, $success); + process_stderr ($jobstepid, $task_success); Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); close $reader{$jobstepid}; @@ -846,7 +861,7 @@ sub reapchildren foreach my $arvados_task (@{$newtask_list->{'items'}}) { my $jobstep = { 'level' => $arvados_task->{'sequence'}, - 'attempts' => 0, + 'failures' => 0, 'arvados_task' => $arvados_task }; push @jobstep, $jobstep; @@ -857,6 +872,27 @@ sub reapchildren 1; } +sub check_refresh_wanted +{ + my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; + if (@stat && $stat[9] > $latest_refresh) { + $latest_refresh = scalar time; + if ($job_has_uuid) { + my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + for my $attr ('cancelled_at', + 'cancelled_by_user_uuid', + 'cancelled_by_client_uuid') { + $Job->{$attr} = $Job2->{$attr}; + } + if ($Job->{'cancelled_at'}) { + Log (undef, "Job cancelled at " . $Job->{cancelled_at} . + " by user " . $Job->{cancelled_by_user_uuid}); + $main::success = 0; + $main::please_freeze = 1; + } + } + } +} sub check_squeue { @@ -981,7 +1017,7 @@ sub preprocess_stderr sub process_stderr { my $job = shift; - my $success = shift; + my $task_success = shift; preprocess_stderr ($job); map { @@ -1021,15 +1057,14 @@ sub collate_output { my $errstr = $whc->errstr; $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n"); - $success = 0; + $main::success = 0; } } $joboutput = $whc->write_finish if !defined $joboutput; if ($joboutput) { Log (undef, "output $joboutput"); - $Job->{'output'} = $joboutput; - $Job->save if $job_has_uuid; + $Job->update_attributes('output' => $joboutput) if $job_has_uuid; } else { @@ -1121,11 +1156,9 @@ sub croak sub cleanup { return if !$job_has_uuid; - $Job->reload; - $Job->{'running'} = 0; - $Job->{'success'} = 0; - $Job->{'finished_at'} = gmtime; - $Job->save; + $Job->update_attributes('running' => 0, + 'success' => 0, + 'finished_at' => scalar gmtime); } @@ -1135,11 +1168,16 @@ sub save_meta my $m = $metastream; $m = $m->copy if $justcheckpoint; $m->write_finish; - my $loglocator = $m->as_key; + my $whc = Warehouse->new; + my $loglocator = $whc->store_block ($m->as_string); + $arv->{'collections'}->{'create'}->execute('collection' => { + 'uuid' => $loglocator, + 'manifest_text' => $m->as_string, + }); undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it - Log (undef, "meta key is $loglocator"); + Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; - $Job->save if $job_has_uuid; + $Job->update_attributes('log', $loglocator) if $job_has_uuid; } @@ -1223,7 +1261,7 @@ sub thaw my ($k, $v) = split ("=", $_, 2); $Jobstep->{$k} = freezeunquote ($v) if $k; } - $Jobstep->{attempts} = 0; + $Jobstep->{'failures'} = 0; push @jobstep, $Jobstep; if ($Jobstep->{exitcode} eq "0") @@ -1313,7 +1351,7 @@ my $repo = $ENV{"CRUNCH_SRC_URL"}; open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit) { +if (readlink ("$destdir.commit") eq $commit && -d $destdir) { exit 0; }