2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
100 if (defined $ENV{"ARV_CLI"}) {
101 $arv_cli = $ENV{"ARV_CLI"};
113 GetOptions('force-unlock' => \$force_unlock,
114 'git-dir=s' => \$git_dir,
115 'job=s' => \$jobspec,
116 'job-api-token=s' => \$job_api_token,
117 'no-clear-tmp' => \$no_clear_tmp,
118 'resume-stash=s' => \$resume_stash,
121 if (defined $job_api_token) {
122 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
132 $main::ENV{CRUNCH_DEBUG} = 1;
136 $main::ENV{CRUNCH_DEBUG} = 0;
141 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $User = $arv->{'users'}->{'current'}->execute;
152 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153 if (!$force_unlock) {
154 if ($Job->{'is_locked_by_uuid'}) {
155 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157 if ($Job->{'success'} ne undef) {
158 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160 if ($Job->{'running'}) {
161 croak("Job 'running' flag is already set");
163 if ($Job->{'started_at'}) {
164 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
170 $Job = JSON::decode_json($jobspec);
174 map { croak ("No $_ specified") unless $Job->{$_} }
175 qw(script script_version script_parameters);
178 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179 $Job->{'started_at'} = gmtime;
181 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
185 $job_id = $Job->{'uuid'};
187 my $keep_logfile = $job_id . '.log.txt';
188 my $local_logfile = File::Temp->new();
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
195 Log (undef, "check slurm allocation");
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
202 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203 push @sinfo, "$localcpus localhost";
205 if (exists $ENV{SLURM_NODELIST})
207 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
211 my ($ncpus, $slurm_nodelist) = split;
212 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
215 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
218 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
221 foreach (split (",", $ranges))
234 push @nodelist, map {
236 $n =~ s/\[[-,\d]+\]/$_/;
243 push @nodelist, $nodelist;
246 foreach my $nodename (@nodelist)
248 Log (undef, "node $nodename - $ncpus slots");
249 my $node = { name => $nodename,
253 foreach my $cpu (1..$ncpus)
255 push @slot, { node => $node,
259 push @node, @nodelist;
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274 # Claim this job, and make sure nobody else does
275 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277 croak("Error while updating / locking job");
279 $Job->update_attributes('started_at' => scalar gmtime,
282 'tasks_summary' => { 'failed' => 0,
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
321 if (defined $Job->{thawedfromkey})
323 thaw ($Job->{thawedfromkey});
327 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328 'job_uuid' => $Job->{'uuid'},
333 push @jobstep, { 'level' => 0,
335 'arvados_task' => $first_task,
337 push @jobstep_todo, 0;
343 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
355 if (!defined $no_clear_tmp) {
356 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357 system($clear_tmp_cmd) == 0
358 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367 or croak ("setup.py in $src_path failed: exit ".($?>>8));
375 $build_script = <DATA>;
377 Log (undef, "Install revision ".$Job->{script_version});
378 my $nodelist = join(",", @node);
380 if (!defined $no_clear_tmp) {
381 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383 my $cleanpid = fork();
386 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
392 last if $cleanpid == waitpid (-1, WNOHANG);
393 freeze_if_want_freeze ($cleanpid);
394 select (undef, undef, undef, 0.1);
396 Log (undef, "Clean-work-dir exited $?");
399 # Install requested code version
402 my @srunargs = ("srun",
403 "--nodelist=$nodelist",
404 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
411 my $treeish = $Job->{'script_version'};
413 # If we're running under crunch-dispatch, it will have pulled the
414 # appropriate source tree into its own repository, and given us that
415 # repo's path as $git_dir. If we're running a "local" job, and a
416 # script_version was specified, it's up to the user to provide the
417 # full path to a local repository in Job->{repository}.
419 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420 # git-archive --remote where appropriate.
422 # TODO: Accept a locally-hosted Arvados repository by name or
423 # UUID. Use arvados.v1.repositories.list or .get to figure out the
424 # appropriate fetch-url.
425 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427 $ENV{"CRUNCH_SRC_URL"} = $repo;
429 if (-d "$repo/.git") {
430 # We were given a working directory, but we are only interested in
432 $repo = "$repo/.git";
435 # If this looks like a subversion r#, look for it in git-svn commit messages
437 if ($treeish =~ m{^\d{1,4}$}) {
438 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440 if ($gitlog =~ /^[a-f0-9]{40}$/) {
442 Log (undef, "Using commit $commit for script_version $treeish");
446 # If that didn't work, try asking git to look it up as a tree-ish.
448 if (!defined $commit) {
449 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451 if ($found =~ /^[0-9a-f]{40}$/s) {
453 if ($commit ne $treeish) {
454 # Make sure we record the real commit id in the database,
455 # frozentokey, logs, etc. -- instead of an abbreviation or a
456 # branch name which can become ambiguous or point to a
457 # different commit in the future.
458 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459 Log (undef, "Using commit $commit for tree-ish $treeish");
460 if ($commit ne $treeish) {
461 $Job->{'script_version'} = $commit;
463 $Job->update_attributes('script_version' => $commit) or
464 croak("Error while updating job");
470 if (defined $commit) {
471 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472 @execargs = ("sh", "-c",
473 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
477 croak ("could not figure out commit id for $treeish");
480 my $installpid = fork();
481 if ($installpid == 0)
483 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
488 last if $installpid == waitpid (-1, WNOHANG);
489 freeze_if_want_freeze ($installpid);
490 select (undef, undef, undef, 0.1);
492 Log (undef, "Install exited $?");
497 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
503 foreach (qw (script script_version script_parameters runtime_constraints))
507 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
509 foreach (split (/\n/, $Job->{knobs}))
511 Log (undef, "knob " . $_);
516 $main::success = undef;
522 my $thisround_succeeded = 0;
523 my $thisround_failed = 0;
524 my $thisround_failed_multiple = 0;
526 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
527 or $a <=> $b } @jobstep_todo;
528 my $level = $jobstep[$jobstep_todo[0]]->{level};
529 Log (undef, "start level $level");
534 my @freeslot = (0..$#slot);
537 my $progress_is_dirty = 1;
538 my $progress_stats_updated = 0;
540 update_progress_stats();
545 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
547 my $id = $jobstep_todo[$todo_ptr];
548 my $Jobstep = $jobstep[$id];
549 if ($Jobstep->{level} != $level)
554 pipe $reader{$id}, "writer" or croak ($!);
555 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
556 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
558 my $childslot = $freeslot[0];
559 my $childnode = $slot[$childslot]->{node};
560 my $childslotname = join (".",
561 $slot[$childslot]->{node}->{name},
562 $slot[$childslot]->{cpu});
563 my $childpid = fork();
566 $SIG{'INT'} = 'DEFAULT';
567 $SIG{'QUIT'} = 'DEFAULT';
568 $SIG{'TERM'} = 'DEFAULT';
570 foreach (values (%reader))
574 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
575 open(STDOUT,">&writer");
576 open(STDERR,">&writer");
581 delete $ENV{"GNUPGHOME"};
582 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
583 $ENV{"TASK_QSEQUENCE"} = $id;
584 $ENV{"TASK_SEQUENCE"} = $level;
585 $ENV{"JOB_SCRIPT"} = $Job->{script};
586 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
587 $param =~ tr/a-z/A-Z/;
588 $ENV{"JOB_PARAMETER_$param"} = $value;
590 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
591 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
592 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
593 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
594 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
595 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
596 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
602 "--nodelist=".$childnode->{name},
603 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
604 "--job-name=$job_id.$id.$$",
606 my @execargs = qw(sh);
607 my $build_script_to_send = "";
609 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
610 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
611 ."&& cd $ENV{CRUNCH_TMP} ";
614 $build_script_to_send = $build_script;
619 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
620 my @execargs = ('bash', '-c', $command);
621 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
625 if (!defined $childpid)
632 $proc{$childpid} = { jobstep => $id,
635 jobstepname => "$job_id.$id.$childpid",
637 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
638 $slot[$childslot]->{pid} = $childpid;
640 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
641 Log ($id, "child $childpid started on $childslotname");
642 $Jobstep->{starttime} = time;
643 $Jobstep->{node} = $childnode->{name};
644 $Jobstep->{slotindex} = $childslot;
645 delete $Jobstep->{stderr};
646 delete $Jobstep->{finishtime};
648 splice @jobstep_todo, $todo_ptr, 1;
651 $progress_is_dirty = 1;
655 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
657 last THISROUND if $main::please_freeze;
658 if ($main::please_info)
660 $main::please_info = 0;
664 update_progress_stats();
671 check_refresh_wanted();
673 update_progress_stats();
674 select (undef, undef, undef, 0.1);
676 elsif (time - $progress_stats_updated >= 30)
678 update_progress_stats();
680 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
681 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
683 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
684 .($thisround_failed+$thisround_succeeded)
685 .") -- giving up on this round";
686 Log (undef, $message);
690 # move slots from freeslot to holdslot (or back to freeslot) if necessary
691 for (my $i=$#freeslot; $i>=0; $i--) {
692 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
693 push @holdslot, (splice @freeslot, $i, 1);
696 for (my $i=$#holdslot; $i>=0; $i--) {
697 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
698 push @freeslot, (splice @holdslot, $i, 1);
702 # give up if no nodes are succeeding
703 if (!grep { $_->{node}->{losing_streak} == 0 &&
704 $_->{node}->{hold_count} < 4 } @slot) {
705 my $message = "Every node has failed -- giving up on this round";
706 Log (undef, $message);
713 push @freeslot, splice @holdslot;
714 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
717 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
720 if ($main::please_continue) {
721 $main::please_continue = 0;
724 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
728 check_refresh_wanted();
730 update_progress_stats();
731 select (undef, undef, undef, 0.1);
732 killem (keys %proc) if $main::please_freeze;
736 update_progress_stats();
737 freeze_if_want_freeze();
740 if (!defined $main::success)
743 $thisround_succeeded == 0 &&
744 ($thisround_failed == 0 || $thisround_failed > 4))
746 my $message = "stop because $thisround_failed tasks failed and none succeeded";
747 Log (undef, $message);
756 goto ONELEVEL if !defined $main::success;
759 release_allocation();
762 $Job->update_attributes('output' => &collate_output(),
764 'success' => $Job->{'output'} && $main::success,
765 'finished_at' => scalar gmtime)
768 if ($Job->{'output'})
771 my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
772 $arv->{'collections'}->{'create'}->execute('collection' => {
773 'uuid' => $Job->{'output'},
774 'manifest_text' => $manifest_text,
776 if ($Job->{'output_is_persistent'}) {
777 $arv->{'links'}->{'create'}->execute('link' => {
778 'tail_kind' => 'arvados#user',
779 'tail_uuid' => $User->{'uuid'},
780 'head_kind' => 'arvados#collection',
781 'head_uuid' => $Job->{'output'},
782 'link_class' => 'resources',
788 Log (undef, "Failed to register output manifest: $@");
792 Log (undef, "finish");
799 sub update_progress_stats
801 $progress_stats_updated = time;
802 return if !$progress_is_dirty;
803 my ($todo, $done, $running) = (scalar @jobstep_todo,
804 scalar @jobstep_done,
805 scalar @slot - scalar @freeslot - scalar @holdslot);
806 $Job->{'tasks_summary'} ||= {};
807 $Job->{'tasks_summary'}->{'todo'} = $todo;
808 $Job->{'tasks_summary'}->{'done'} = $done;
809 $Job->{'tasks_summary'}->{'running'} = $running;
811 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
813 Log (undef, "status: $done done, $running running, $todo todo");
814 $progress_is_dirty = 0;
821 my $pid = waitpid (-1, WNOHANG);
822 return 0 if $pid <= 0;
824 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
826 . $slot[$proc{$pid}->{slot}]->{cpu});
827 my $jobstepid = $proc{$pid}->{jobstep};
828 my $elapsed = time - $proc{$pid}->{time};
829 my $Jobstep = $jobstep[$jobstepid];
831 my $childstatus = $?;
832 my $exitvalue = $childstatus >> 8;
833 my $exitinfo = sprintf("exit %d signal %d%s",
836 ($childstatus & 128 ? ' core dump' : ''));
837 $Jobstep->{'arvados_task'}->reload;
838 my $task_success = $Jobstep->{'arvados_task'}->{success};
840 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
842 if (!defined $task_success) {
843 # task did not indicate one way or the other --> fail
844 $Jobstep->{'arvados_task'}->{success} = 0;
845 $Jobstep->{'arvados_task'}->save;
852 $temporary_fail ||= $Jobstep->{node_fail};
853 $temporary_fail ||= ($exitvalue == 111);
856 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
858 # Check for signs of a failed or misconfigured node
859 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
860 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
861 # Don't count this against jobstep failure thresholds if this
862 # node is already suspected faulty and srun exited quickly
863 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
865 Log ($jobstepid, "blaming failure on suspect node " .
866 $slot[$proc{$pid}->{slot}]->{node}->{name});
867 $temporary_fail ||= 1;
869 ban_node_by_slot($proc{$pid}->{slot});
872 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
873 ++$Jobstep->{'failures'},
874 $temporary_fail ? 'temporary ' : 'permanent',
877 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
878 # Give up on this task, and the whole job
880 $main::please_freeze = 1;
883 # Put this task back on the todo queue
884 push @jobstep_todo, $jobstepid;
886 $Job->{'tasks_summary'}->{'failed'}++;
890 ++$thisround_succeeded;
891 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
892 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
893 push @jobstep_done, $jobstepid;
894 Log ($jobstepid, "success in $elapsed seconds");
896 $Jobstep->{exitcode} = $childstatus;
897 $Jobstep->{finishtime} = time;
898 process_stderr ($jobstepid, $task_success);
899 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
901 close $reader{$jobstepid};
902 delete $reader{$jobstepid};
903 delete $slot[$proc{$pid}->{slot}]->{pid};
904 push @freeslot, $proc{$pid}->{slot};
908 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
910 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
912 'order' => 'qsequence'
914 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
916 'level' => $arvados_task->{'sequence'},
918 'arvados_task' => $arvados_task
920 push @jobstep, $jobstep;
921 push @jobstep_todo, $#jobstep;
924 $progress_is_dirty = 1;
928 sub check_refresh_wanted
930 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
931 if (@stat && $stat[9] > $latest_refresh) {
932 $latest_refresh = scalar time;
934 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
935 for my $attr ('cancelled_at',
936 'cancelled_by_user_uuid',
937 'cancelled_by_client_uuid') {
938 $Job->{$attr} = $Job2->{$attr};
940 if ($Job->{'cancelled_at'}) {
941 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
942 " by user " . $Job->{cancelled_by_user_uuid});
944 $main::please_freeze = 1;
952 # return if the kill list was checked <4 seconds ago
953 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
957 $squeue_kill_checked = time;
959 # use killem() on procs whose killtime is reached
962 if (exists $proc{$_}->{killtime}
963 && $proc{$_}->{killtime} <= time)
969 # return if the squeue was checked <60 seconds ago
970 if (defined $squeue_checked && $squeue_checked > time - 60)
974 $squeue_checked = time;
978 # here is an opportunity to check for mysterious problems with local procs
982 # get a list of steps still running
983 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
985 if ($squeue[-1] ne "ok")
991 # which of my jobsteps are running, according to squeue?
995 if (/^(\d+)\.(\d+) (\S+)/)
997 if ($1 eq $ENV{SLURM_JOBID})
1004 # which of my active child procs (>60s old) were not mentioned by squeue?
1005 foreach (keys %proc)
1007 if ($proc{$_}->{time} < time - 60
1008 && !exists $ok{$proc{$_}->{jobstepname}}
1009 && !exists $proc{$_}->{killtime})
1011 # kill this proc if it hasn't exited in 30 seconds
1012 $proc{$_}->{killtime} = time + 30;
1018 sub release_allocation
1022 Log (undef, "release job allocation");
1023 system "scancel $ENV{SLURM_JOBID}";
1031 foreach my $job (keys %reader)
1034 while (0 < sysread ($reader{$job}, $buf, 8192))
1036 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1037 $jobstep[$job]->{stderr} .= $buf;
1038 preprocess_stderr ($job);
1039 if (length ($jobstep[$job]->{stderr}) > 16384)
1041 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1050 sub preprocess_stderr
1054 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1056 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1057 Log ($job, "stderr $line");
1058 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1060 $main::please_freeze = 1;
1062 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1063 $jobstep[$job]->{node_fail} = 1;
1064 ban_node_by_slot($jobstep[$job]->{slotindex});
1073 my $task_success = shift;
1074 preprocess_stderr ($job);
1077 Log ($job, "stderr $_");
1078 } split ("\n", $jobstep[$job]->{stderr});
1084 my ($keep, $child_out, $output_block);
1086 my $cmd = "$arv_cli keep get \Q$hash\E";
1087 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1088 sysread($keep, $output_block, 64 * 1024 * 1024);
1090 return $output_block;
1095 Log (undef, "collate");
1097 my ($child_out, $child_in);
1098 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1102 next if (!exists $_->{'arvados_task'}->{output} ||
1103 !$_->{'arvados_task'}->{'success'} ||
1104 $_->{'exitcode'} != 0);
1105 my $output = $_->{'arvados_task'}->{output};
1106 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1108 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1109 print $child_in $output;
1111 elsif (@jobstep == 1)
1113 $joboutput = $output;
1116 elsif (defined (my $outblock = fetch_block ($output)))
1118 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1119 print $child_in $outblock;
1123 Log (undef, "XXX fetch_block($output) failed XXX");
1129 if (!defined $joboutput) {
1130 my $s = IO::Select->new($child_out);
1131 if ($s->can_read(120)) {
1132 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1135 Log (undef, "timed out reading from 'arv keep put'");
1142 Log (undef, "output $joboutput");
1143 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1147 Log (undef, "output undef");
1157 my $sig = 2; # SIGINT first
1158 if (exists $proc{$_}->{"sent_$sig"} &&
1159 time - $proc{$_}->{"sent_$sig"} > 4)
1161 $sig = 15; # SIGTERM if SIGINT doesn't work
1163 if (exists $proc{$_}->{"sent_$sig"} &&
1164 time - $proc{$_}->{"sent_$sig"} > 4)
1166 $sig = 9; # SIGKILL if SIGTERM doesn't work
1168 if (!exists $proc{$_}->{"sent_$sig"})
1170 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1172 select (undef, undef, undef, 0.1);
1175 kill $sig, $_; # srun wants two SIGINT to really interrupt
1177 $proc{$_}->{"sent_$sig"} = time;
1178 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1188 vec($bits,fileno($_),1) = 1;
1194 sub Log # ($jobstep_id, $logmessage)
1196 if ($_[1] =~ /\n/) {
1197 for my $line (split (/\n/, $_[1])) {
1202 my $fh = select STDERR; $|=1; select $fh;
1203 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1204 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1207 if ($metastream || -t STDERR) {
1208 my @gmtime = gmtime;
1209 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1210 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1212 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1215 print $metastream $datetime . " " . $message;
1222 my ($package, $file, $line) = caller;
1223 my $message = "@_ at $file line $line\n";
1224 Log (undef, $message);
1225 freeze() if @jobstep_todo;
1226 collate_output() if @jobstep_todo;
1228 save_meta() if $metastream;
1235 return if !$job_has_uuid;
1236 $Job->update_attributes('running' => 0,
1238 'finished_at' => scalar gmtime);
1244 my $justcheckpoint = shift; # false if this will be the last meta saved
1245 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1247 $local_logfile->flush;
1248 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1249 . quotemeta($local_logfile->filename);
1250 my $loglocator = `$cmd`;
1251 die "system $cmd failed: $?" if $?;
1253 $local_logfile = undef; # the temp file is automatically deleted
1254 Log (undef, "log manifest is $loglocator");
1255 $Job->{'log'} = $loglocator;
1256 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1260 sub freeze_if_want_freeze
1262 if ($main::please_freeze)
1264 release_allocation();
1267 # kill some srun procs before freeze+stop
1268 map { $proc{$_} = {} } @_;
1271 killem (keys %proc);
1272 select (undef, undef, undef, 0.1);
1274 while (($died = waitpid (-1, WNOHANG)) > 0)
1276 delete $proc{$died};
1291 Log (undef, "Freeze not implemented");
1298 croak ("Thaw not implemented");
1314 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1321 my $srunargs = shift;
1322 my $execargs = shift;
1323 my $opts = shift || {};
1325 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1326 print STDERR (join (" ",
1327 map { / / ? "'$_'" : $_ }
1330 if $ENV{CRUNCH_DEBUG};
1332 if (defined $stdin) {
1333 my $child = open STDIN, "-|";
1334 defined $child or die "no fork: $!";
1336 print $stdin or die $!;
1337 close STDOUT or die $!;
1342 return system (@$args) if $opts->{fork};
1345 warn "ENV size is ".length(join(" ",%ENV));
1346 die "exec failed: $!: @$args";
1350 sub ban_node_by_slot {
1351 # Don't start any new jobsteps on this node for 60 seconds
1353 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1354 $slot[$slotid]->{node}->{hold_count}++;
1355 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1360 my ($lockfile, $error_message) = @_;
1361 open L, ">", $lockfile or croak("$lockfile: $!");
1362 if (!flock L, LOCK_EX|LOCK_NB) {
1363 croak("Can't lock $lockfile: $error_message\n");
1370 # checkout-and-build
1374 my $destdir = $ENV{"CRUNCH_SRC"};
1375 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1376 my $repo = $ENV{"CRUNCH_SRC_URL"};
1378 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1380 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1384 unlink "$destdir.commit";
1385 open STDOUT, ">", "$destdir.log";
1386 open STDERR, ">&STDOUT";
1389 my @git_archive_data = <DATA>;
1390 if (@git_archive_data) {
1391 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1392 print TARX @git_archive_data;
1394 die "'tar -C $destdir -xf -' exited $?: $!";
1399 chomp ($pwd = `pwd`);
1400 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1403 for my $src_path ("$destdir/arvados/sdk/python") {
1405 shell_or_die ("virtualenv", $install_dir);
1406 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1410 if (-e "$destdir/crunch_scripts/install") {
1411 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1412 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1414 shell_or_die ("./tests/autotests.sh", $install_dir);
1415 } elsif (-e "./install.sh") {
1416 shell_or_die ("./install.sh", $install_dir);
1420 unlink "$destdir.commit.new";
1421 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1422 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1431 if ($ENV{"DEBUG"}) {
1432 print STDERR "@_\n";
1435 or die "@_ failed: $! exit 0x".sprintf("%x",$?);