2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
104 GetOptions('force-unlock' => \$force_unlock,
105 'git-dir=s' => \$git_dir,
106 'job=s' => \$jobspec,
107 'job-api-token=s' => \$job_api_token,
108 'no-clear-tmp' => \$no_clear_tmp,
109 'resume-stash=s' => \$resume_stash,
112 if (defined $job_api_token) {
113 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
123 $main::ENV{CRUNCH_DEBUG} = 1;
127 $main::ENV{CRUNCH_DEBUG} = 0;
132 my $arv = Arvados->new('apiVersion' => 'v1');
135 my $User = $arv->{'users'}->{'current'}->execute;
143 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144 if (!$force_unlock) {
145 if ($Job->{'is_locked_by_uuid'}) {
146 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148 if ($Job->{'success'} ne undef) {
149 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151 if ($Job->{'running'}) {
152 croak("Job 'running' flag is already set");
154 if ($Job->{'started_at'}) {
155 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
161 $Job = JSON::decode_json($jobspec);
165 map { croak ("No $_ specified") unless $Job->{$_} }
166 qw(script script_version script_parameters);
169 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170 $Job->{'started_at'} = gmtime;
172 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
176 $job_id = $Job->{'uuid'};
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
186 Log (undef, "check slurm allocation");
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
193 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194 push @sinfo, "$localcpus localhost";
196 if (exists $ENV{SLURM_NODELIST})
198 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
202 my ($ncpus, $slurm_nodelist) = split;
203 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
206 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
209 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
212 foreach (split (",", $ranges))
225 push @nodelist, map {
227 $n =~ s/\[[-,\d]+\]/$_/;
234 push @nodelist, $nodelist;
237 foreach my $nodename (@nodelist)
239 Log (undef, "node $nodename - $ncpus slots");
240 my $node = { name => $nodename,
244 foreach my $cpu (1..$ncpus)
246 push @slot, { node => $node,
250 push @node, @nodelist;
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
265 # Claim this job, and make sure nobody else does
266 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268 croak("Error while updating / locking job");
270 $Job->update_attributes('started_at' => scalar gmtime,
273 'tasks_summary' => { 'failed' => 0,
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
312 if (defined $Job->{thawedfromkey})
314 thaw ($Job->{thawedfromkey});
318 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319 'job_uuid' => $Job->{'uuid'},
324 push @jobstep, { 'level' => 0,
326 'arvados_task' => $first_task,
328 push @jobstep_todo, 0;
334 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
346 if (!defined $no_clear_tmp) {
347 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348 system($clear_tmp_cmd) == 0
349 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
351 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
354 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
358 or croak ("setup.py in $src_path failed: exit ".($?>>8));
366 $build_script = <DATA>;
368 Log (undef, "Install revision ".$Job->{script_version});
369 my $nodelist = join(",", @node);
371 if (!defined $no_clear_tmp) {
372 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
374 my $cleanpid = fork();
377 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
383 last if $cleanpid == waitpid (-1, WNOHANG);
384 freeze_if_want_freeze ($cleanpid);
385 select (undef, undef, undef, 0.1);
387 Log (undef, "Clean-work-dir exited $?");
390 # Install requested code version
393 my @srunargs = ("srun",
394 "--nodelist=$nodelist",
395 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
397 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
402 my $treeish = $Job->{'script_version'};
404 # If we're running under crunch-dispatch, it will have pulled the
405 # appropriate source tree into its own repository, and given us that
406 # repo's path as $git_dir. If we're running a "local" job, and a
407 # script_version was specified, it's up to the user to provide the
408 # full path to a local repository in Job->{repository}.
410 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
411 # git-archive --remote where appropriate.
413 # TODO: Accept a locally-hosted Arvados repository by name or
414 # UUID. Use arvados.v1.repositories.list or .get to figure out the
415 # appropriate fetch-url.
416 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
418 $ENV{"CRUNCH_SRC_URL"} = $repo;
420 if (-d "$repo/.git") {
421 # We were given a working directory, but we are only interested in
423 $repo = "$repo/.git";
426 # If this looks like a subversion r#, look for it in git-svn commit messages
428 if ($treeish =~ m{^\d{1,4}$}) {
429 my $gitlog = `git --git-dir="$repo" log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " master`;
431 if ($gitlog =~ /^[a-f0-9]{40}$/) {
433 Log (undef, "Using commit $commit for script_version $treeish");
437 # If that didn't work, try asking git to look it up as a tree-ish.
439 if (!defined $commit) {
440 my $found = `git --git-dir="$repo" rev-list -1 "$treeish"`;
442 if ($found =~ /^[0-9a-f]{40}$/s) {
444 if ($commit ne $treeish) {
445 # Make sure we record the real commit id in the database,
446 # frozentokey, logs, etc. -- instead of an abbreviation or a
447 # branch name which can become ambiguous or point to a
448 # different commit in the future.
449 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
450 Log (undef, "Using commit $commit for tree-ish $treeish");
451 if ($commit ne $treeish) {
452 $Job->{'script_version'} = $commit;
454 $Job->update_attributes('script_version' => $commit) or
455 croak("Error while updating job");
461 if (defined $commit) {
462 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
463 @execargs = ("sh", "-c",
464 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
465 $git_archive = `git --git-dir="$repo" archive "$commit"`;
468 croak ("could not figure out commit id for $treeish");
471 my $installpid = fork();
472 if ($installpid == 0)
474 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
479 last if $installpid == waitpid (-1, WNOHANG);
480 freeze_if_want_freeze ($installpid);
481 select (undef, undef, undef, 0.1);
483 Log (undef, "Install exited $?");
488 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
489 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
494 foreach (qw (script script_version script_parameters runtime_constraints))
498 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
500 foreach (split (/\n/, $Job->{knobs}))
502 Log (undef, "knob " . $_);
507 $main::success = undef;
513 my $thisround_succeeded = 0;
514 my $thisround_failed = 0;
515 my $thisround_failed_multiple = 0;
517 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
518 or $a <=> $b } @jobstep_todo;
519 my $level = $jobstep[$jobstep_todo[0]]->{level};
520 Log (undef, "start level $level");
525 my @freeslot = (0..$#slot);
528 my $progress_is_dirty = 1;
529 my $progress_stats_updated = 0;
531 update_progress_stats();
536 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
538 my $id = $jobstep_todo[$todo_ptr];
539 my $Jobstep = $jobstep[$id];
540 if ($Jobstep->{level} != $level)
545 pipe $reader{$id}, "writer" or croak ($!);
546 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
547 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
549 my $childslot = $freeslot[0];
550 my $childnode = $slot[$childslot]->{node};
551 my $childslotname = join (".",
552 $slot[$childslot]->{node}->{name},
553 $slot[$childslot]->{cpu});
554 my $childpid = fork();
557 $SIG{'INT'} = 'DEFAULT';
558 $SIG{'QUIT'} = 'DEFAULT';
559 $SIG{'TERM'} = 'DEFAULT';
561 foreach (values (%reader))
565 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
566 open(STDOUT,">&writer");
567 open(STDERR,">&writer");
572 delete $ENV{"GNUPGHOME"};
573 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
574 $ENV{"TASK_QSEQUENCE"} = $id;
575 $ENV{"TASK_SEQUENCE"} = $level;
576 $ENV{"JOB_SCRIPT"} = $Job->{script};
577 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
578 $param =~ tr/a-z/A-Z/;
579 $ENV{"JOB_PARAMETER_$param"} = $value;
581 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
582 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
583 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
584 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
585 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
586 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
587 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
593 "--nodelist=".$childnode->{name},
594 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
595 "--job-name=$job_id.$id.$$",
597 my @execargs = qw(sh);
598 my $build_script_to_send = "";
600 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
601 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
602 ."&& cd $ENV{CRUNCH_TMP} ";
605 $build_script_to_send = $build_script;
610 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
611 my @execargs = ('bash', '-c', $command);
612 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
616 if (!defined $childpid)
623 $proc{$childpid} = { jobstep => $id,
626 jobstepname => "$job_id.$id.$childpid",
628 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
629 $slot[$childslot]->{pid} = $childpid;
631 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
632 Log ($id, "child $childpid started on $childslotname");
633 $Jobstep->{starttime} = time;
634 $Jobstep->{node} = $childnode->{name};
635 $Jobstep->{slotindex} = $childslot;
636 delete $Jobstep->{stderr};
637 delete $Jobstep->{finishtime};
639 splice @jobstep_todo, $todo_ptr, 1;
642 $progress_is_dirty = 1;
646 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
648 last THISROUND if $main::please_freeze;
649 if ($main::please_info)
651 $main::please_info = 0;
655 update_progress_stats();
662 check_refresh_wanted();
664 update_progress_stats();
665 select (undef, undef, undef, 0.1);
667 elsif (time - $progress_stats_updated >= 30)
669 update_progress_stats();
671 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
672 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
674 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
675 .($thisround_failed+$thisround_succeeded)
676 .") -- giving up on this round";
677 Log (undef, $message);
681 # move slots from freeslot to holdslot (or back to freeslot) if necessary
682 for (my $i=$#freeslot; $i>=0; $i--) {
683 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
684 push @holdslot, (splice @freeslot, $i, 1);
687 for (my $i=$#holdslot; $i>=0; $i--) {
688 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
689 push @freeslot, (splice @holdslot, $i, 1);
693 # give up if no nodes are succeeding
694 if (!grep { $_->{node}->{losing_streak} == 0 &&
695 $_->{node}->{hold_count} < 4 } @slot) {
696 my $message = "Every node has failed -- giving up on this round";
697 Log (undef, $message);
704 push @freeslot, splice @holdslot;
705 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
708 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
711 if ($main::please_continue) {
712 $main::please_continue = 0;
715 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
719 check_refresh_wanted();
721 update_progress_stats();
722 select (undef, undef, undef, 0.1);
723 killem (keys %proc) if $main::please_freeze;
727 update_progress_stats();
728 freeze_if_want_freeze();
731 if (!defined $main::success)
734 $thisround_succeeded == 0 &&
735 ($thisround_failed == 0 || $thisround_failed > 4))
737 my $message = "stop because $thisround_failed tasks failed and none succeeded";
738 Log (undef, $message);
747 goto ONELEVEL if !defined $main::success;
750 release_allocation();
753 $Job->update_attributes('output' => &collate_output(),
755 'success' => $Job->{'output'} && $main::success,
756 'finished_at' => scalar gmtime)
759 if ($Job->{'output'})
762 my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
763 $arv->{'collections'}->{'create'}->execute('collection' => {
764 'uuid' => $Job->{'output'},
765 'manifest_text' => $manifest_text,
767 if ($Job->{'output_is_persistent'}) {
768 $arv->{'links'}->{'create'}->execute('link' => {
769 'tail_kind' => 'arvados#user',
770 'tail_uuid' => $User->{'uuid'},
771 'head_kind' => 'arvados#collection',
772 'head_uuid' => $Job->{'output'},
773 'link_class' => 'resources',
779 Log (undef, "Failed to register output manifest: $@");
783 Log (undef, "finish");
790 sub update_progress_stats
792 $progress_stats_updated = time;
793 return if !$progress_is_dirty;
794 my ($todo, $done, $running) = (scalar @jobstep_todo,
795 scalar @jobstep_done,
796 scalar @slot - scalar @freeslot - scalar @holdslot);
797 $Job->{'tasks_summary'} ||= {};
798 $Job->{'tasks_summary'}->{'todo'} = $todo;
799 $Job->{'tasks_summary'}->{'done'} = $done;
800 $Job->{'tasks_summary'}->{'running'} = $running;
802 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
804 Log (undef, "status: $done done, $running running, $todo todo");
805 $progress_is_dirty = 0;
812 my $pid = waitpid (-1, WNOHANG);
813 return 0 if $pid <= 0;
815 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
817 . $slot[$proc{$pid}->{slot}]->{cpu});
818 my $jobstepid = $proc{$pid}->{jobstep};
819 my $elapsed = time - $proc{$pid}->{time};
820 my $Jobstep = $jobstep[$jobstepid];
822 my $childstatus = $?;
823 my $exitvalue = $childstatus >> 8;
824 my $exitinfo = sprintf("exit %d signal %d%s",
827 ($childstatus & 128 ? ' core dump' : ''));
828 $Jobstep->{'arvados_task'}->reload;
829 my $task_success = $Jobstep->{'arvados_task'}->{success};
831 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
833 if (!defined $task_success) {
834 # task did not indicate one way or the other --> fail
835 $Jobstep->{'arvados_task'}->{success} = 0;
836 $Jobstep->{'arvados_task'}->save;
843 $temporary_fail ||= $Jobstep->{node_fail};
844 $temporary_fail ||= ($exitvalue == 111);
847 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
849 # Check for signs of a failed or misconfigured node
850 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
851 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
852 # Don't count this against jobstep failure thresholds if this
853 # node is already suspected faulty and srun exited quickly
854 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
856 Log ($jobstepid, "blaming failure on suspect node " .
857 $slot[$proc{$pid}->{slot}]->{node}->{name});
858 $temporary_fail ||= 1;
860 ban_node_by_slot($proc{$pid}->{slot});
863 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
864 ++$Jobstep->{'failures'},
865 $temporary_fail ? 'temporary ' : 'permanent',
868 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
869 # Give up on this task, and the whole job
871 $main::please_freeze = 1;
874 # Put this task back on the todo queue
875 push @jobstep_todo, $jobstepid;
877 $Job->{'tasks_summary'}->{'failed'}++;
881 ++$thisround_succeeded;
882 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
883 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
884 push @jobstep_done, $jobstepid;
885 Log ($jobstepid, "success in $elapsed seconds");
887 $Jobstep->{exitcode} = $childstatus;
888 $Jobstep->{finishtime} = time;
889 process_stderr ($jobstepid, $task_success);
890 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
892 close $reader{$jobstepid};
893 delete $reader{$jobstepid};
894 delete $slot[$proc{$pid}->{slot}]->{pid};
895 push @freeslot, $proc{$pid}->{slot};
899 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
901 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
903 'order' => 'qsequence'
905 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
907 'level' => $arvados_task->{'sequence'},
909 'arvados_task' => $arvados_task
911 push @jobstep, $jobstep;
912 push @jobstep_todo, $#jobstep;
915 $progress_is_dirty = 1;
919 sub check_refresh_wanted
921 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
922 if (@stat && $stat[9] > $latest_refresh) {
923 $latest_refresh = scalar time;
925 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
926 for my $attr ('cancelled_at',
927 'cancelled_by_user_uuid',
928 'cancelled_by_client_uuid') {
929 $Job->{$attr} = $Job2->{$attr};
931 if ($Job->{'cancelled_at'}) {
932 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
933 " by user " . $Job->{cancelled_by_user_uuid});
935 $main::please_freeze = 1;
943 # return if the kill list was checked <4 seconds ago
944 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
948 $squeue_kill_checked = time;
950 # use killem() on procs whose killtime is reached
953 if (exists $proc{$_}->{killtime}
954 && $proc{$_}->{killtime} <= time)
960 # return if the squeue was checked <60 seconds ago
961 if (defined $squeue_checked && $squeue_checked > time - 60)
965 $squeue_checked = time;
969 # here is an opportunity to check for mysterious problems with local procs
973 # get a list of steps still running
974 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
976 if ($squeue[-1] ne "ok")
982 # which of my jobsteps are running, according to squeue?
986 if (/^(\d+)\.(\d+) (\S+)/)
988 if ($1 eq $ENV{SLURM_JOBID})
995 # which of my active child procs (>60s old) were not mentioned by squeue?
998 if ($proc{$_}->{time} < time - 60
999 && !exists $ok{$proc{$_}->{jobstepname}}
1000 && !exists $proc{$_}->{killtime})
1002 # kill this proc if it hasn't exited in 30 seconds
1003 $proc{$_}->{killtime} = time + 30;
1009 sub release_allocation
1013 Log (undef, "release job allocation");
1014 system "scancel $ENV{SLURM_JOBID}";
1022 foreach my $job (keys %reader)
1025 while (0 < sysread ($reader{$job}, $buf, 8192))
1027 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1028 $jobstep[$job]->{stderr} .= $buf;
1029 preprocess_stderr ($job);
1030 if (length ($jobstep[$job]->{stderr}) > 16384)
1032 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1041 sub preprocess_stderr
1045 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1047 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1048 Log ($job, "stderr $line");
1049 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1051 $main::please_freeze = 1;
1053 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1054 $jobstep[$job]->{node_fail} = 1;
1055 ban_node_by_slot($jobstep[$job]->{slotindex});
1064 my $task_success = shift;
1065 preprocess_stderr ($job);
1068 Log ($job, "stderr $_");
1069 } split ("\n", $jobstep[$job]->{stderr});
1075 my ($keep, $child_out, $output_block);
1077 my $cmd = "arv keep get \Q$hash\E";
1078 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1079 sysread($keep, $output_block, 64 * 1024 * 1024);
1081 return $output_block;
1086 Log (undef, "collate");
1088 my ($child_out, $child_in);
1089 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1093 next if (!exists $_->{'arvados_task'}->{output} ||
1094 !$_->{'arvados_task'}->{'success'} ||
1095 $_->{'exitcode'} != 0);
1096 my $output = $_->{'arvados_task'}->{output};
1097 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1099 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1100 print $child_in $output;
1102 elsif (@jobstep == 1)
1104 $joboutput = $output;
1107 elsif (defined (my $outblock = fetch_block ($output)))
1109 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1110 print $child_in $outblock;
1114 Log (undef, "XXX fetch_block($output) failed XXX");
1120 if (!defined $joboutput) {
1121 my $s = IO::Select->new($child_out);
1122 if ($s->can_read(120)) {
1123 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1126 Log (undef, "timed out reading from 'arv keep put'");
1133 Log (undef, "output $joboutput");
1134 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1138 Log (undef, "output undef");
1148 my $sig = 2; # SIGINT first
1149 if (exists $proc{$_}->{"sent_$sig"} &&
1150 time - $proc{$_}->{"sent_$sig"} > 4)
1152 $sig = 15; # SIGTERM if SIGINT doesn't work
1154 if (exists $proc{$_}->{"sent_$sig"} &&
1155 time - $proc{$_}->{"sent_$sig"} > 4)
1157 $sig = 9; # SIGKILL if SIGTERM doesn't work
1159 if (!exists $proc{$_}->{"sent_$sig"})
1161 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1163 select (undef, undef, undef, 0.1);
1166 kill $sig, $_; # srun wants two SIGINT to really interrupt
1168 $proc{$_}->{"sent_$sig"} = time;
1169 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1179 vec($bits,fileno($_),1) = 1;
1185 sub Log # ($jobstep_id, $logmessage)
1187 if ($_[1] =~ /\n/) {
1188 for my $line (split (/\n/, $_[1])) {
1193 my $fh = select STDERR; $|=1; select $fh;
1194 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1195 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1198 if ($metastream || -t STDERR) {
1199 my @gmtime = gmtime;
1200 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1201 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1203 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1206 print $metastream $datetime . " " . $message;
1213 my ($package, $file, $line) = caller;
1214 my $message = "@_ at $file line $line\n";
1215 Log (undef, $message);
1216 freeze() if @jobstep_todo;
1217 collate_output() if @jobstep_todo;
1219 save_meta() if $metastream;
1226 return if !$job_has_uuid;
1227 $Job->update_attributes('running' => 0,
1229 'finished_at' => scalar gmtime);
1235 my $justcheckpoint = shift; # false if this will be the last meta saved
1236 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1238 $local_logfile->flush;
1239 my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1240 . quotemeta($local_logfile->filename);
1241 my $loglocator = `$cmd`;
1242 die "system $cmd failed: $?" if $?;
1244 $local_logfile = undef; # the temp file is automatically deleted
1245 Log (undef, "log manifest is $loglocator");
1246 $Job->{'log'} = $loglocator;
1247 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1251 sub freeze_if_want_freeze
1253 if ($main::please_freeze)
1255 release_allocation();
1258 # kill some srun procs before freeze+stop
1259 map { $proc{$_} = {} } @_;
1262 killem (keys %proc);
1263 select (undef, undef, undef, 0.1);
1265 while (($died = waitpid (-1, WNOHANG)) > 0)
1267 delete $proc{$died};
1282 Log (undef, "Freeze not implemented");
1289 croak ("Thaw not implemented");
1305 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1312 my $srunargs = shift;
1313 my $execargs = shift;
1314 my $opts = shift || {};
1316 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1317 print STDERR (join (" ",
1318 map { / / ? "'$_'" : $_ }
1321 if $ENV{CRUNCH_DEBUG};
1323 if (defined $stdin) {
1324 my $child = open STDIN, "-|";
1325 defined $child or die "no fork: $!";
1327 print $stdin or die $!;
1328 close STDOUT or die $!;
1333 return system (@$args) if $opts->{fork};
1336 warn "ENV size is ".length(join(" ",%ENV));
1337 die "exec failed: $!: @$args";
1341 sub ban_node_by_slot {
1342 # Don't start any new jobsteps on this node for 60 seconds
1344 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1345 $slot[$slotid]->{node}->{hold_count}++;
1346 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1351 my ($lockfile, $error_message) = @_;
1352 open L, ">", $lockfile or croak("$lockfile: $!");
1353 if (!flock L, LOCK_EX|LOCK_NB) {
1354 croak("Can't lock $lockfile: $error_message\n");
1361 # checkout-and-build
1365 my $destdir = $ENV{"CRUNCH_SRC"};
1366 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1367 my $repo = $ENV{"CRUNCH_SRC_URL"};
1369 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1371 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1375 unlink "$destdir.commit";
1376 open STDOUT, ">", "$destdir.log";
1377 open STDERR, ">&STDOUT";
1380 my @git_archive_data = <DATA>;
1381 if (@git_archive_data) {
1382 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1383 print TARX @git_archive_data;
1385 die "'tar -C $destdir -xf -' exited $?: $!";
1390 chomp ($pwd = `pwd`);
1391 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1394 for my $src_path ("$destdir/arvados/sdk/python") {
1396 shell_or_die ("virtualenv", $install_dir);
1397 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1401 if (-e "$destdir/crunch_scripts/install") {
1402 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1403 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1405 shell_or_die ("./tests/autotests.sh", $install_dir);
1406 } elsif (-e "./install.sh") {
1407 shell_or_die ("./install.sh", $install_dir);
1411 unlink "$destdir.commit.new";
1412 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1413 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1422 if ($ENV{"DEBUG"}) {
1423 print STDERR "@_\n";
1426 or die "@_ failed: $! exit 0x".sprintf("%x",$?);