2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
96 GetOptions('force-unlock' => \$force_unlock,
97 'git-dir=s' => \$git_dir,
99 'job-api-token=s' => \$job_api_token,
100 'resume-stash=s' => \$resume_stash,
103 if (defined $job_api_token) {
104 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
114 $main::ENV{CRUNCH_DEBUG} = 1;
118 $main::ENV{CRUNCH_DEBUG} = 0;
123 my $arv = Arvados->new('apiVersion' => 'v1');
126 my $User = $arv->{'users'}->{'current'}->execute;
134 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135 if (!$force_unlock) {
136 if ($Job->{'is_locked_by_uuid'}) {
137 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139 if ($Job->{'success'} ne undef) {
140 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142 if ($Job->{'running'}) {
143 croak("Job 'running' flag is already set");
145 if ($Job->{'started_at'}) {
146 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
152 $Job = JSON::decode_json($jobspec);
156 map { croak ("No $_ specified") unless $Job->{$_} }
157 qw(script script_version script_parameters);
160 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161 $Job->{'started_at'} = gmtime;
163 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
167 $job_id = $Job->{'uuid'};
169 my $keep_logfile = $job_id . '.log.txt';
170 my $local_logfile = File::Temp->new();
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
177 Log (undef, "check slurm allocation");
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185 push @sinfo, "$localcpus localhost";
187 if (exists $ENV{SLURM_NODELIST})
189 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 my ($ncpus, $slurm_nodelist) = split;
194 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
197 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
200 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
203 foreach (split (",", $ranges))
216 push @nodelist, map {
218 $n =~ s/\[[-,\d]+\]/$_/;
225 push @nodelist, $nodelist;
228 foreach my $nodename (@nodelist)
230 Log (undef, "node $nodename - $ncpus slots");
231 my $node = { name => $nodename,
235 foreach my $cpu (1..$ncpus)
237 push @slot, { node => $node,
241 push @node, @nodelist;
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
256 # Claim this job, and make sure nobody else does
257 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259 croak("Error while updating / locking job");
261 $Job->update_attributes('started_at' => scalar gmtime,
264 'tasks_summary' => { 'failed' => 0,
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299 my $latest_refresh = scalar time;
303 if (defined $Job->{thawedfromkey})
305 thaw ($Job->{thawedfromkey});
309 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310 'job_uuid' => $Job->{'uuid'},
315 push @jobstep, { 'level' => 0,
317 'arvados_task' => $first_task,
319 push @jobstep_todo, 0;
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
334 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
335 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
336 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
338 or croak ("setup.py in $src_path failed: exit ".($?>>8));
346 $build_script = <DATA>;
348 Log (undef, "Install revision ".$Job->{script_version});
349 my $nodelist = join(",", @node);
351 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
353 my $cleanpid = fork();
356 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
357 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
362 last if $cleanpid == waitpid (-1, WNOHANG);
363 freeze_if_want_freeze ($cleanpid);
364 select (undef, undef, undef, 0.1);
366 Log (undef, "Clean-work-dir exited $?");
368 # Install requested code version
371 my @srunargs = ("srun",
372 "--nodelist=$nodelist",
373 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
375 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
376 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
380 my $treeish = $Job->{'script_version'};
381 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
382 # Todo: let script_version specify repository instead of expecting
383 # parent process to figure it out.
384 $ENV{"CRUNCH_SRC_URL"} = $repo;
386 # Create/update our clone of the remote git repo
388 if (!-d $ENV{"CRUNCH_SRC"}) {
389 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
390 or croak ("git clone $repo failed: exit ".($?>>8));
391 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
393 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
395 # If this looks like a subversion r#, look for it in git-svn commit messages
397 if ($treeish =~ m{^\d{1,4}$}) {
398 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
400 if ($gitlog =~ /^[a-f0-9]{40}$/) {
402 Log (undef, "Using commit $commit for script_version $treeish");
406 # If that didn't work, try asking git to look it up as a tree-ish.
408 if (!defined $commit) {
410 my $cooked_treeish = $treeish;
411 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
412 # Looks like a git branch name -- make sure git knows it's
413 # relative to the remote repo
414 $cooked_treeish = "origin/$treeish";
417 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
419 if ($found =~ /^[0-9a-f]{40}$/s) {
421 if ($commit ne $treeish) {
422 # Make sure we record the real commit id in the database,
423 # frozentokey, logs, etc. -- instead of an abbreviation or a
424 # branch name which can become ambiguous or point to a
425 # different commit in the future.
426 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
427 Log (undef, "Using commit $commit for tree-ish $treeish");
428 if ($commit ne $treeish) {
429 $Job->{'script_version'} = $commit;
431 $Job->update_attributes('script_version' => $commit) or
432 croak("Error while updating job");
438 if (defined $commit) {
439 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
440 @execargs = ("sh", "-c",
441 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
442 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
445 croak ("could not figure out commit id for $treeish");
448 my $installpid = fork();
449 if ($installpid == 0)
451 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
456 last if $installpid == waitpid (-1, WNOHANG);
457 freeze_if_want_freeze ($installpid);
458 select (undef, undef, undef, 0.1);
460 Log (undef, "Install exited $?");
465 foreach (qw (script script_version script_parameters runtime_constraints))
469 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
471 foreach (split (/\n/, $Job->{knobs}))
473 Log (undef, "knob " . $_);
478 $main::success = undef;
484 my $thisround_succeeded = 0;
485 my $thisround_failed = 0;
486 my $thisround_failed_multiple = 0;
488 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
489 or $a <=> $b } @jobstep_todo;
490 my $level = $jobstep[$jobstep_todo[0]]->{level};
491 Log (undef, "start level $level");
496 my @freeslot = (0..$#slot);
499 my $progress_is_dirty = 1;
500 my $progress_stats_updated = 0;
502 update_progress_stats();
507 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
509 my $id = $jobstep_todo[$todo_ptr];
510 my $Jobstep = $jobstep[$id];
511 if ($Jobstep->{level} != $level)
516 pipe $reader{$id}, "writer" or croak ($!);
517 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
518 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
520 my $childslot = $freeslot[0];
521 my $childnode = $slot[$childslot]->{node};
522 my $childslotname = join (".",
523 $slot[$childslot]->{node}->{name},
524 $slot[$childslot]->{cpu});
525 my $childpid = fork();
528 $SIG{'INT'} = 'DEFAULT';
529 $SIG{'QUIT'} = 'DEFAULT';
530 $SIG{'TERM'} = 'DEFAULT';
532 foreach (values (%reader))
536 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
537 open(STDOUT,">&writer");
538 open(STDERR,">&writer");
543 delete $ENV{"GNUPGHOME"};
544 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
545 $ENV{"TASK_QSEQUENCE"} = $id;
546 $ENV{"TASK_SEQUENCE"} = $level;
547 $ENV{"JOB_SCRIPT"} = $Job->{script};
548 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
549 $param =~ tr/a-z/A-Z/;
550 $ENV{"JOB_PARAMETER_$param"} = $value;
552 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
553 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
554 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
555 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
556 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
557 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
558 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
564 "--nodelist=".$childnode->{name},
565 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
566 "--job-name=$job_id.$id.$$",
568 my @execargs = qw(sh);
569 my $build_script_to_send = "";
571 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
572 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
573 ."&& cd $ENV{CRUNCH_TMP} ";
576 $build_script_to_send = $build_script;
581 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
582 my @execargs = ('bash', '-c', $command);
583 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
587 if (!defined $childpid)
594 $proc{$childpid} = { jobstep => $id,
597 jobstepname => "$job_id.$id.$childpid",
599 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
600 $slot[$childslot]->{pid} = $childpid;
602 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
603 Log ($id, "child $childpid started on $childslotname");
604 $Jobstep->{starttime} = time;
605 $Jobstep->{node} = $childnode->{name};
606 $Jobstep->{slotindex} = $childslot;
607 delete $Jobstep->{stderr};
608 delete $Jobstep->{finishtime};
610 splice @jobstep_todo, $todo_ptr, 1;
613 $progress_is_dirty = 1;
617 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
619 last THISROUND if $main::please_freeze;
620 if ($main::please_info)
622 $main::please_info = 0;
626 update_progress_stats();
633 check_refresh_wanted();
635 update_progress_stats();
636 select (undef, undef, undef, 0.1);
638 elsif (time - $progress_stats_updated >= 30)
640 update_progress_stats();
642 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
643 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
645 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
646 .($thisround_failed+$thisround_succeeded)
647 .") -- giving up on this round";
648 Log (undef, $message);
652 # move slots from freeslot to holdslot (or back to freeslot) if necessary
653 for (my $i=$#freeslot; $i>=0; $i--) {
654 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
655 push @holdslot, (splice @freeslot, $i, 1);
658 for (my $i=$#holdslot; $i>=0; $i--) {
659 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
660 push @freeslot, (splice @holdslot, $i, 1);
664 # give up if no nodes are succeeding
665 if (!grep { $_->{node}->{losing_streak} == 0 &&
666 $_->{node}->{hold_count} < 4 } @slot) {
667 my $message = "Every node has failed -- giving up on this round";
668 Log (undef, $message);
675 push @freeslot, splice @holdslot;
676 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
679 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
682 if ($main::please_continue) {
683 $main::please_continue = 0;
686 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
690 check_refresh_wanted();
692 update_progress_stats();
693 select (undef, undef, undef, 0.1);
694 killem (keys %proc) if $main::please_freeze;
698 update_progress_stats();
699 freeze_if_want_freeze();
702 if (!defined $main::success)
705 $thisround_succeeded == 0 &&
706 ($thisround_failed == 0 || $thisround_failed > 4))
708 my $message = "stop because $thisround_failed tasks failed and none succeeded";
709 Log (undef, $message);
718 goto ONELEVEL if !defined $main::success;
721 release_allocation();
724 $Job->update_attributes('output' => &collate_output(),
726 'success' => $Job->{'output'} && $main::success,
727 'finished_at' => scalar gmtime)
730 if ($Job->{'output'})
733 my $manifest_text = `arv keep get $Job->{'output'}`;
734 $arv->{'collections'}->{'create'}->execute('collection' => {
735 'uuid' => $Job->{'output'},
736 'manifest_text' => $manifest_text,
740 Log (undef, "Failed to register output manifest: $@");
744 Log (undef, "finish");
751 sub update_progress_stats
753 $progress_stats_updated = time;
754 return if !$progress_is_dirty;
755 my ($todo, $done, $running) = (scalar @jobstep_todo,
756 scalar @jobstep_done,
757 scalar @slot - scalar @freeslot - scalar @holdslot);
758 $Job->{'tasks_summary'} ||= {};
759 $Job->{'tasks_summary'}->{'todo'} = $todo;
760 $Job->{'tasks_summary'}->{'done'} = $done;
761 $Job->{'tasks_summary'}->{'running'} = $running;
763 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
765 Log (undef, "status: $done done, $running running, $todo todo");
766 $progress_is_dirty = 0;
773 my $pid = waitpid (-1, WNOHANG);
774 return 0 if $pid <= 0;
776 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
778 . $slot[$proc{$pid}->{slot}]->{cpu});
779 my $jobstepid = $proc{$pid}->{jobstep};
780 my $elapsed = time - $proc{$pid}->{time};
781 my $Jobstep = $jobstep[$jobstepid];
783 my $childstatus = $?;
784 my $exitvalue = $childstatus >> 8;
785 my $exitinfo = sprintf("exit %d signal %d%s",
788 ($childstatus & 128 ? ' core dump' : ''));
789 $Jobstep->{'arvados_task'}->reload;
790 my $task_success = $Jobstep->{'arvados_task'}->{success};
792 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
794 if (!defined $task_success) {
795 # task did not indicate one way or the other --> fail
796 $Jobstep->{'arvados_task'}->{success} = 0;
797 $Jobstep->{'arvados_task'}->save;
804 $temporary_fail ||= $Jobstep->{node_fail};
805 $temporary_fail ||= ($exitvalue == 111);
808 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
810 # Check for signs of a failed or misconfigured node
811 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
812 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
813 # Don't count this against jobstep failure thresholds if this
814 # node is already suspected faulty and srun exited quickly
815 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
817 Log ($jobstepid, "blaming failure on suspect node " .
818 $slot[$proc{$pid}->{slot}]->{node}->{name});
819 $temporary_fail ||= 1;
821 ban_node_by_slot($proc{$pid}->{slot});
824 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
825 ++$Jobstep->{'failures'},
826 $temporary_fail ? 'temporary ' : 'permanent',
829 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
830 # Give up on this task, and the whole job
832 $main::please_freeze = 1;
835 # Put this task back on the todo queue
836 push @jobstep_todo, $jobstepid;
838 $Job->{'tasks_summary'}->{'failed'}++;
842 ++$thisround_succeeded;
843 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
844 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
845 push @jobstep_done, $jobstepid;
846 Log ($jobstepid, "success in $elapsed seconds");
848 $Jobstep->{exitcode} = $childstatus;
849 $Jobstep->{finishtime} = time;
850 process_stderr ($jobstepid, $task_success);
851 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
853 close $reader{$jobstepid};
854 delete $reader{$jobstepid};
855 delete $slot[$proc{$pid}->{slot}]->{pid};
856 push @freeslot, $proc{$pid}->{slot};
860 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
862 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
864 'order' => 'qsequence'
866 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
868 'level' => $arvados_task->{'sequence'},
870 'arvados_task' => $arvados_task
872 push @jobstep, $jobstep;
873 push @jobstep_todo, $#jobstep;
876 $progress_is_dirty = 1;
880 sub check_refresh_wanted
882 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
883 if (@stat && $stat[9] > $latest_refresh) {
884 $latest_refresh = scalar time;
886 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
887 for my $attr ('cancelled_at',
888 'cancelled_by_user_uuid',
889 'cancelled_by_client_uuid') {
890 $Job->{$attr} = $Job2->{$attr};
892 if ($Job->{'cancelled_at'}) {
893 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
894 " by user " . $Job->{cancelled_by_user_uuid});
896 $main::please_freeze = 1;
904 # return if the kill list was checked <4 seconds ago
905 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
909 $squeue_kill_checked = time;
911 # use killem() on procs whose killtime is reached
914 if (exists $proc{$_}->{killtime}
915 && $proc{$_}->{killtime} <= time)
921 # return if the squeue was checked <60 seconds ago
922 if (defined $squeue_checked && $squeue_checked > time - 60)
926 $squeue_checked = time;
930 # here is an opportunity to check for mysterious problems with local procs
934 # get a list of steps still running
935 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
937 if ($squeue[-1] ne "ok")
943 # which of my jobsteps are running, according to squeue?
947 if (/^(\d+)\.(\d+) (\S+)/)
949 if ($1 eq $ENV{SLURM_JOBID})
956 # which of my active child procs (>60s old) were not mentioned by squeue?
959 if ($proc{$_}->{time} < time - 60
960 && !exists $ok{$proc{$_}->{jobstepname}}
961 && !exists $proc{$_}->{killtime})
963 # kill this proc if it hasn't exited in 30 seconds
964 $proc{$_}->{killtime} = time + 30;
970 sub release_allocation
974 Log (undef, "release job allocation");
975 system "scancel $ENV{SLURM_JOBID}";
983 foreach my $job (keys %reader)
986 while (0 < sysread ($reader{$job}, $buf, 8192))
988 print STDERR $buf if $ENV{CRUNCH_DEBUG};
989 $jobstep[$job]->{stderr} .= $buf;
990 preprocess_stderr ($job);
991 if (length ($jobstep[$job]->{stderr}) > 16384)
993 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1002 sub preprocess_stderr
1006 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1008 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1009 Log ($job, "stderr $line");
1010 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1012 $main::please_freeze = 1;
1014 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1015 $jobstep[$job]->{node_fail} = 1;
1016 ban_node_by_slot($jobstep[$job]->{slotindex});
1025 my $task_success = shift;
1026 preprocess_stderr ($job);
1029 Log ($job, "stderr $_");
1030 } split ("\n", $jobstep[$job]->{stderr});
1036 my ($child_out, $child_in, $output_block);
1038 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
1039 sysread($child_out, $output_block, 64 * 1024 * 1024);
1041 return $output_block;
1046 Log (undef, "collate");
1048 my ($child_out, $child_in);
1049 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1053 next if (!exists $_->{'arvados_task'}->{output} ||
1054 !$_->{'arvados_task'}->{'success'} ||
1055 $_->{'exitcode'} != 0);
1056 my $output = $_->{'arvados_task'}->{output};
1057 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1059 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1060 print $child_in $output;
1062 elsif (@jobstep == 1)
1064 $joboutput = $output;
1067 elsif (defined (my $outblock = fetch_block ($output)))
1069 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1070 print $child_in $outblock;
1074 print $child_in "XXX fetch_block($output) failed XXX\n";
1078 if (!defined $joboutput) {
1079 my $s = IO::Select->new($child_out);
1080 sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(5);
1087 Log (undef, "output $joboutput");
1088 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1092 Log (undef, "output undef");
1102 my $sig = 2; # SIGINT first
1103 if (exists $proc{$_}->{"sent_$sig"} &&
1104 time - $proc{$_}->{"sent_$sig"} > 4)
1106 $sig = 15; # SIGTERM if SIGINT doesn't work
1108 if (exists $proc{$_}->{"sent_$sig"} &&
1109 time - $proc{$_}->{"sent_$sig"} > 4)
1111 $sig = 9; # SIGKILL if SIGTERM doesn't work
1113 if (!exists $proc{$_}->{"sent_$sig"})
1115 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1117 select (undef, undef, undef, 0.1);
1120 kill $sig, $_; # srun wants two SIGINT to really interrupt
1122 $proc{$_}->{"sent_$sig"} = time;
1123 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1133 vec($bits,fileno($_),1) = 1;
1139 sub Log # ($jobstep_id, $logmessage)
1141 if ($_[1] =~ /\n/) {
1142 for my $line (split (/\n/, $_[1])) {
1147 my $fh = select STDERR; $|=1; select $fh;
1148 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1149 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1152 if ($metastream || -t STDERR) {
1153 my @gmtime = gmtime;
1154 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1155 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1157 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1160 print $metastream $datetime . " " . $message;
1167 my ($package, $file, $line) = caller;
1168 my $message = "@_ at $file line $line\n";
1169 Log (undef, $message);
1170 freeze() if @jobstep_todo;
1171 collate_output() if @jobstep_todo;
1173 save_meta() if $metastream;
1180 return if !$job_has_uuid;
1181 $Job->update_attributes('running' => 0,
1183 'finished_at' => scalar gmtime);
1189 my $justcheckpoint = shift; # false if this will be the last meta saved
1190 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1192 $local_logfile->flush;
1193 my $cmd = "arv keep put --filename $keep_logfile ". $local_logfile->filename;
1194 my $loglocator = `$cmd`;
1195 die "system $cmd failed: $?" if $?;
1197 $local_logfile = undef; # the temp file is automatically deleted
1198 Log (undef, "log manifest is $loglocator");
1199 $Job->{'log'} = $loglocator;
1200 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1204 sub freeze_if_want_freeze
1206 if ($main::please_freeze)
1208 release_allocation();
1211 # kill some srun procs before freeze+stop
1212 map { $proc{$_} = {} } @_;
1215 killem (keys %proc);
1216 select (undef, undef, undef, 0.1);
1218 while (($died = waitpid (-1, WNOHANG)) > 0)
1220 delete $proc{$died};
1235 Log (undef, "Freeze not implemented");
1242 croak ("Thaw not implemented");
1246 # Log (undef, "thaw from $key");
1249 # @jobstep_done = ();
1250 # @jobstep_todo = ();
1251 # @jobstep_tomerge = ();
1252 # $jobstep_tomerge_level = 0;
1253 # my $frozenjob = {};
1255 # my $stream = new Warehouse::Stream ( whc => $whc,
1256 # hash => [split (",", $key)] );
1258 # while (my $dataref = $stream->read_until (undef, "\n\n"))
1260 # if ($$dataref =~ /^job /)
1262 # foreach (split ("\n", $$dataref))
1264 # my ($k, $v) = split ("=", $_, 2);
1265 # $frozenjob->{$k} = freezeunquote ($v);
1270 # if ($$dataref =~ /^merge (\d+) (.*)/)
1272 # $jobstep_tomerge_level = $1;
1274 # = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1278 # my $Jobstep = { };
1279 # foreach (split ("\n", $$dataref))
1281 # my ($k, $v) = split ("=", $_, 2);
1282 # $Jobstep->{$k} = freezeunquote ($v) if $k;
1284 # $Jobstep->{'failures'} = 0;
1285 # push @jobstep, $Jobstep;
1287 # if ($Jobstep->{exitcode} eq "0")
1289 # push @jobstep_done, $#jobstep;
1293 # push @jobstep_todo, $#jobstep;
1297 # foreach (qw (script script_version script_parameters))
1299 # $Job->{$_} = $frozenjob->{$_};
1301 # $Job->save if $job_has_uuid;
1317 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1324 my $srunargs = shift;
1325 my $execargs = shift;
1326 my $opts = shift || {};
1328 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1329 print STDERR (join (" ",
1330 map { / / ? "'$_'" : $_ }
1333 if $ENV{CRUNCH_DEBUG};
1335 if (defined $stdin) {
1336 my $child = open STDIN, "-|";
1337 defined $child or die "no fork: $!";
1339 print $stdin or die $!;
1340 close STDOUT or die $!;
1345 return system (@$args) if $opts->{fork};
1348 warn "ENV size is ".length(join(" ",%ENV));
1349 die "exec failed: $!: @$args";
1353 sub ban_node_by_slot {
1354 # Don't start any new jobsteps on this node for 60 seconds
1356 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1357 $slot[$slotid]->{node}->{hold_count}++;
1358 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1364 # checkout-and-build
1368 my $destdir = $ENV{"CRUNCH_SRC"};
1369 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1370 my $repo = $ENV{"CRUNCH_SRC_URL"};
1372 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1374 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1378 unlink "$destdir.commit";
1379 open STDOUT, ">", "$destdir.log";
1380 open STDERR, ">&STDOUT";
1383 my @git_archive_data = <DATA>;
1384 if (@git_archive_data) {
1385 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1386 print TARX @git_archive_data;
1388 die "'tar -C $destdir -xf -' exited $?: $!";
1393 chomp ($pwd = `pwd`);
1394 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1397 for my $src_path ("$destdir/arvados/sdk/python") {
1399 shell_or_die ("virtualenv", $install_dir);
1400 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1404 if (-e "$destdir/crunch_scripts/install") {
1405 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1406 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1408 shell_or_die ("./tests/autotests.sh", $install_dir);
1409 } elsif (-e "./install.sh") {
1410 shell_or_die ("./install.sh", $install_dir);
1414 unlink "$destdir.commit.new";
1415 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1416 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1425 if ($ENV{"DEBUG"}) {
1426 print STDERR "@_\n";
1429 or die "@_ failed: $! exit 0x".sprintf("%x",$?);