2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
95 GetOptions('force-unlock' => \$force_unlock,
96 'git-dir=s' => \$git_dir,
98 'job-api-token=s' => \$job_api_token,
99 'resume-stash=s' => \$resume_stash,
102 if (defined $job_api_token) {
103 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
113 $main::ENV{CRUNCH_DEBUG} = 1;
117 $main::ENV{CRUNCH_DEBUG} = 0;
122 my $arv = Arvados->new;
123 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
125 $metastream->write_start('log.txt');
127 my $User = $arv->{'users'}->{'current'}->execute;
135 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
136 if (!$force_unlock) {
137 if ($Job->{'is_locked_by_uuid'}) {
138 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
140 if ($Job->{'success'} ne undef) {
141 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
143 if ($Job->{'running'}) {
144 croak("Job 'running' flag is already set");
146 if ($Job->{'started_at'}) {
147 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
153 $Job = JSON::decode_json($jobspec);
157 map { croak ("No $_ specified") unless $Job->{$_} }
158 qw(script script_version script_parameters);
161 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
162 $Job->{'started_at'} = gmtime;
164 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
168 $job_id = $Job->{'uuid'};
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
177 Log (undef, "check slurm allocation");
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185 push @sinfo, "$localcpus localhost";
187 if (exists $ENV{SLURM_NODELIST})
189 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 my ($ncpus, $slurm_nodelist) = split;
194 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
197 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
200 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
203 foreach (split (",", $ranges))
216 push @nodelist, map {
218 $n =~ s/\[[-,\d]+\]/$_/;
225 push @nodelist, $nodelist;
228 foreach my $nodename (@nodelist)
230 Log (undef, "node $nodename - $ncpus slots");
231 my $node = { name => $nodename,
235 foreach my $cpu (1..$ncpus)
237 push @slot, { node => $node,
241 push @node, @nodelist;
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
256 # Claim this job, and make sure nobody else does
257 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259 croak("Error while updating / locking job");
261 $Job->update_attributes('started_at' => scalar gmtime,
264 'tasks_summary' => { 'failed' => 0,
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
302 if (defined $Job->{thawedfromkey})
304 thaw ($Job->{thawedfromkey});
308 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
309 'job_uuid' => $Job->{'uuid'},
314 push @jobstep, { 'level' => 0,
316 'arvados_task' => $first_task,
318 push @jobstep_todo, 0;
325 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
330 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
336 $build_script = <DATA>;
338 Log (undef, "Install revision ".$Job->{script_version});
339 my $nodelist = join(",", @node);
341 # Clean out crunch_tmp/work and crunch_tmp/opt
343 my $cleanpid = fork();
346 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
347 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
352 last if $cleanpid == waitpid (-1, WNOHANG);
353 freeze_if_want_freeze ($cleanpid);
354 select (undef, undef, undef, 0.1);
356 Log (undef, "Clean-work-dir exited $?");
358 # Install requested code version
361 my @srunargs = ("srun",
362 "--nodelist=$nodelist",
363 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
365 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
366 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
367 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
371 my $treeish = $Job->{'script_version'};
372 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
373 # Todo: let script_version specify repository instead of expecting
374 # parent process to figure it out.
375 $ENV{"CRUNCH_SRC_URL"} = $repo;
377 # Create/update our clone of the remote git repo
379 if (!-d $ENV{"CRUNCH_SRC"}) {
380 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
381 or croak ("git clone $repo failed: exit ".($?>>8));
382 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
384 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
386 # If this looks like a subversion r#, look for it in git-svn commit messages
388 if ($treeish =~ m{^\d{1,4}$}) {
389 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
391 if ($gitlog =~ /^[a-f0-9]{40}$/) {
393 Log (undef, "Using commit $commit for script_version $treeish");
397 # If that didn't work, try asking git to look it up as a tree-ish.
399 if (!defined $commit) {
401 my $cooked_treeish = $treeish;
402 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
403 # Looks like a git branch name -- make sure git knows it's
404 # relative to the remote repo
405 $cooked_treeish = "origin/$treeish";
408 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
410 if ($found =~ /^[0-9a-f]{40}$/s) {
412 if ($commit ne $treeish) {
413 # Make sure we record the real commit id in the database,
414 # frozentokey, logs, etc. -- instead of an abbreviation or a
415 # branch name which can become ambiguous or point to a
416 # different commit in the future.
417 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
418 Log (undef, "Using commit $commit for tree-ish $treeish");
419 if ($commit ne $treeish) {
420 $Job->{'script_version'} = $commit;
422 $Job->update_attributes('script_version' => $commit) or
423 croak("Error while updating job");
429 if (defined $commit) {
430 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
431 @execargs = ("sh", "-c",
432 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
433 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
436 croak ("could not figure out commit id for $treeish");
439 my $installpid = fork();
440 if ($installpid == 0)
442 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
447 last if $installpid == waitpid (-1, WNOHANG);
448 freeze_if_want_freeze ($installpid);
449 select (undef, undef, undef, 0.1);
451 Log (undef, "Install exited $?");
456 foreach (qw (script script_version script_parameters runtime_constraints))
460 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
462 foreach (split (/\n/, $Job->{knobs}))
464 Log (undef, "knob " . $_);
469 $main::success = undef;
475 my $thisround_succeeded = 0;
476 my $thisround_failed = 0;
477 my $thisround_failed_multiple = 0;
479 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
480 or $a <=> $b } @jobstep_todo;
481 my $level = $jobstep[$jobstep_todo[0]]->{level};
482 Log (undef, "start level $level");
487 my @freeslot = (0..$#slot);
490 my $progress_is_dirty = 1;
491 my $progress_stats_updated = 0;
493 update_progress_stats();
498 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
500 my $id = $jobstep_todo[$todo_ptr];
501 my $Jobstep = $jobstep[$id];
502 if ($Jobstep->{level} != $level)
507 pipe $reader{$id}, "writer" or croak ($!);
508 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
511 my $childslot = $freeslot[0];
512 my $childnode = $slot[$childslot]->{node};
513 my $childslotname = join (".",
514 $slot[$childslot]->{node}->{name},
515 $slot[$childslot]->{cpu});
516 my $childpid = fork();
519 $SIG{'INT'} = 'DEFAULT';
520 $SIG{'QUIT'} = 'DEFAULT';
521 $SIG{'TERM'} = 'DEFAULT';
523 foreach (values (%reader))
527 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528 open(STDOUT,">&writer");
529 open(STDERR,">&writer");
534 delete $ENV{"GNUPGHOME"};
535 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536 $ENV{"TASK_QSEQUENCE"} = $id;
537 $ENV{"TASK_SEQUENCE"} = $level;
538 $ENV{"JOB_SCRIPT"} = $Job->{script};
539 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540 $param =~ tr/a-z/A-Z/;
541 $ENV{"JOB_PARAMETER_$param"} = $value;
543 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
546 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
547 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
553 "--nodelist=".$childnode->{name},
554 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
555 "--job-name=$job_id.$id.$$",
557 my @execargs = qw(sh);
558 my $build_script_to_send = "";
560 "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
561 ."&& cd $ENV{CRUNCH_TMP} ";
564 $build_script_to_send = $build_script;
568 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
569 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
570 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
572 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
573 my @execargs = ('bash', '-c', $command);
574 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
578 if (!defined $childpid)
585 $proc{$childpid} = { jobstep => $id,
588 jobstepname => "$job_id.$id.$childpid",
590 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
591 $slot[$childslot]->{pid} = $childpid;
593 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
594 Log ($id, "child $childpid started on $childslotname");
595 $Jobstep->{starttime} = time;
596 $Jobstep->{node} = $childnode->{name};
597 $Jobstep->{slotindex} = $childslot;
598 delete $Jobstep->{stderr};
599 delete $Jobstep->{finishtime};
601 splice @jobstep_todo, $todo_ptr, 1;
604 $progress_is_dirty = 1;
608 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
610 last THISROUND if $main::please_freeze;
611 if ($main::please_refresh)
613 $main::please_refresh = 0;
615 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
616 for my $attr ('cancelled_at',
617 'cancelled_by_user_uuid',
618 'cancelled_by_client_uuid') {
619 $Job->{$attr} = $Job2->{$attr};
621 if ($Job->{'cancelled_at'}) {
622 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
623 " by user " . $Job->{cancelled_by_user_uuid});
625 $main::please_freeze = 1;
629 if ($main::please_info)
631 $main::please_info = 0;
635 update_progress_stats();
643 update_progress_stats();
644 select (undef, undef, undef, 0.1);
646 elsif (time - $progress_stats_updated >= 30)
648 update_progress_stats();
650 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
651 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
653 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
654 .($thisround_failed+$thisround_succeeded)
655 .") -- giving up on this round";
656 Log (undef, $message);
660 # move slots from freeslot to holdslot (or back to freeslot) if necessary
661 for (my $i=$#freeslot; $i>=0; $i--) {
662 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
663 push @holdslot, (splice @freeslot, $i, 1);
666 for (my $i=$#holdslot; $i>=0; $i--) {
667 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
668 push @freeslot, (splice @holdslot, $i, 1);
672 # give up if no nodes are succeeding
673 if (!grep { $_->{node}->{losing_streak} == 0 &&
674 $_->{node}->{hold_count} < 4 } @slot) {
675 my $message = "Every node has failed -- giving up on this round";
676 Log (undef, $message);
683 push @freeslot, splice @holdslot;
684 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
687 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
690 if ($main::please_continue) {
691 $main::please_continue = 0;
694 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
699 update_progress_stats();
700 select (undef, undef, undef, 0.1);
701 killem (keys %proc) if $main::please_freeze;
705 update_progress_stats();
706 freeze_if_want_freeze();
709 if (!defined $main::success)
712 $thisround_succeeded == 0 &&
713 ($thisround_failed == 0 || $thisround_failed > 4))
715 my $message = "stop because $thisround_failed tasks failed and none succeeded";
716 Log (undef, $message);
725 goto ONELEVEL if !defined $main::success;
728 release_allocation();
731 $Job->update_attributes('output' => &collate_output(),
733 'success' => $Job->{'output'} && $main::success,
734 'finished_at' => scalar gmtime)
737 if ($Job->{'output'})
740 my $manifest_text = capturex("whget", $Job->{'output'});
741 $arv->{'collections'}->{'create'}->execute('collection' => {
742 'uuid' => $Job->{'output'},
743 'manifest_text' => $manifest_text,
747 Log (undef, "Failed to register output manifest: $@");
751 Log (undef, "finish");
758 sub update_progress_stats
760 $progress_stats_updated = time;
761 return if !$progress_is_dirty;
762 my ($todo, $done, $running) = (scalar @jobstep_todo,
763 scalar @jobstep_done,
764 scalar @slot - scalar @freeslot - scalar @holdslot);
765 $Job->{'tasks_summary'} ||= {};
766 $Job->{'tasks_summary'}->{'todo'} = $todo;
767 $Job->{'tasks_summary'}->{'done'} = $done;
768 $Job->{'tasks_summary'}->{'running'} = $running;
770 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
772 Log (undef, "status: $done done, $running running, $todo todo");
773 $progress_is_dirty = 0;
780 my $pid = waitpid (-1, WNOHANG);
781 return 0 if $pid <= 0;
783 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
785 . $slot[$proc{$pid}->{slot}]->{cpu});
786 my $jobstepid = $proc{$pid}->{jobstep};
787 my $elapsed = time - $proc{$pid}->{time};
788 my $Jobstep = $jobstep[$jobstepid];
790 my $childstatus = $?;
791 my $exitvalue = $childstatus >> 8;
792 my $exitinfo = sprintf("exit %d signal %d%s",
795 ($childstatus & 128 ? ' core dump' : ''));
796 $Jobstep->{'arvados_task'}->reload;
797 my $task_success = $Jobstep->{'arvados_task'}->{success};
799 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
801 if (!defined $task_success) {
802 # task did not indicate one way or the other --> fail
803 $Jobstep->{'arvados_task'}->{success} = 0;
804 $Jobstep->{'arvados_task'}->save;
811 $temporary_fail ||= $Jobstep->{node_fail};
812 $temporary_fail ||= ($exitvalue == 111);
815 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
817 # Check for signs of a failed or misconfigured node
818 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
819 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
820 # Don't count this against jobstep failure thresholds if this
821 # node is already suspected faulty and srun exited quickly
822 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
824 Log ($jobstepid, "blaming failure on suspect node " .
825 $slot[$proc{$pid}->{slot}]->{node}->{name});
826 $temporary_fail ||= 1;
828 ban_node_by_slot($proc{$pid}->{slot});
831 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
832 ++$Jobstep->{'failures'},
833 $temporary_fail ? 'temporary ' : 'permanent',
836 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
837 # Give up on this task, and the whole job
839 $main::please_freeze = 1;
842 # Put this task back on the todo queue
843 push @jobstep_todo, $jobstepid;
845 $Job->{'tasks_summary'}->{'failed'}++;
849 ++$thisround_succeeded;
850 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
851 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
852 push @jobstep_done, $jobstepid;
853 Log ($jobstepid, "success in $elapsed seconds");
855 $Jobstep->{exitcode} = $childstatus;
856 $Jobstep->{finishtime} = time;
857 process_stderr ($jobstepid, $task_success);
858 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
860 close $reader{$jobstepid};
861 delete $reader{$jobstepid};
862 delete $slot[$proc{$pid}->{slot}]->{pid};
863 push @freeslot, $proc{$pid}->{slot};
867 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
869 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
871 'order' => 'qsequence'
873 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
875 'level' => $arvados_task->{'sequence'},
877 'arvados_task' => $arvados_task
879 push @jobstep, $jobstep;
880 push @jobstep_todo, $#jobstep;
883 $progress_is_dirty = 1;
890 # return if the kill list was checked <4 seconds ago
891 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
895 $squeue_kill_checked = time;
897 # use killem() on procs whose killtime is reached
900 if (exists $proc{$_}->{killtime}
901 && $proc{$_}->{killtime} <= time)
907 # return if the squeue was checked <60 seconds ago
908 if (defined $squeue_checked && $squeue_checked > time - 60)
912 $squeue_checked = time;
916 # here is an opportunity to check for mysterious problems with local procs
920 # get a list of steps still running
921 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
923 if ($squeue[-1] ne "ok")
929 # which of my jobsteps are running, according to squeue?
933 if (/^(\d+)\.(\d+) (\S+)/)
935 if ($1 eq $ENV{SLURM_JOBID})
942 # which of my active child procs (>60s old) were not mentioned by squeue?
945 if ($proc{$_}->{time} < time - 60
946 && !exists $ok{$proc{$_}->{jobstepname}}
947 && !exists $proc{$_}->{killtime})
949 # kill this proc if it hasn't exited in 30 seconds
950 $proc{$_}->{killtime} = time + 30;
956 sub release_allocation
960 Log (undef, "release job allocation");
961 system "scancel $ENV{SLURM_JOBID}";
969 foreach my $job (keys %reader)
972 while (0 < sysread ($reader{$job}, $buf, 8192))
974 print STDERR $buf if $ENV{CRUNCH_DEBUG};
975 $jobstep[$job]->{stderr} .= $buf;
976 preprocess_stderr ($job);
977 if (length ($jobstep[$job]->{stderr}) > 16384)
979 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
988 sub preprocess_stderr
992 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
994 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
995 Log ($job, "stderr $line");
996 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
998 $main::please_freeze = 1;
1000 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1001 $jobstep[$job]->{node_fail} = 1;
1002 ban_node_by_slot($jobstep[$job]->{slotindex});
1011 my $task_success = shift;
1012 preprocess_stderr ($job);
1015 Log ($job, "stderr $_");
1016 } split ("\n", $jobstep[$job]->{stderr});
1022 my $whc = Warehouse->new;
1023 Log (undef, "collate");
1024 $whc->write_start (1);
1028 next if (!exists $_->{'arvados_task'}->{output} ||
1029 !$_->{'arvados_task'}->{'success'} ||
1030 $_->{'exitcode'} != 0);
1031 my $output = $_->{'arvados_task'}->{output};
1032 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1034 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1035 $whc->write_data ($output);
1037 elsif (@jobstep == 1)
1039 $joboutput = $output;
1042 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1044 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1045 $whc->write_data ($outblock);
1049 my $errstr = $whc->errstr;
1050 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1054 $joboutput = $whc->write_finish if !defined $joboutput;
1057 Log (undef, "output $joboutput");
1058 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1062 Log (undef, "output undef");
1072 my $sig = 2; # SIGINT first
1073 if (exists $proc{$_}->{"sent_$sig"} &&
1074 time - $proc{$_}->{"sent_$sig"} > 4)
1076 $sig = 15; # SIGTERM if SIGINT doesn't work
1078 if (exists $proc{$_}->{"sent_$sig"} &&
1079 time - $proc{$_}->{"sent_$sig"} > 4)
1081 $sig = 9; # SIGKILL if SIGTERM doesn't work
1083 if (!exists $proc{$_}->{"sent_$sig"})
1085 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1087 select (undef, undef, undef, 0.1);
1090 kill $sig, $_; # srun wants two SIGINT to really interrupt
1092 $proc{$_}->{"sent_$sig"} = time;
1093 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1103 vec($bits,fileno($_),1) = 1;
1109 sub Log # ($jobstep_id, $logmessage)
1111 if ($_[1] =~ /\n/) {
1112 for my $line (split (/\n/, $_[1])) {
1117 my $fh = select STDERR; $|=1; select $fh;
1118 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1119 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1122 if ($metastream || -t STDERR) {
1123 my @gmtime = gmtime;
1124 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1125 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1127 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1129 return if !$metastream;
1130 $metastream->write_data ($datetime . " " . $message);
1136 my ($package, $file, $line) = caller;
1137 my $message = "@_ at $file line $line\n";
1138 Log (undef, $message);
1139 freeze() if @jobstep_todo;
1140 collate_output() if @jobstep_todo;
1142 save_meta() if $metastream;
1149 return if !$job_has_uuid;
1150 $Job->update_attributes('running' => 0,
1152 'finished_at' => scalar gmtime);
1158 my $justcheckpoint = shift; # false if this will be the last meta saved
1159 my $m = $metastream;
1160 $m = $m->copy if $justcheckpoint;
1162 my $loglocator = $m->as_key;
1163 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1164 Log (undef, "meta key is $loglocator");
1165 $Job->{'log'} = $loglocator;
1166 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1170 sub freeze_if_want_freeze
1172 if ($main::please_freeze)
1174 release_allocation();
1177 # kill some srun procs before freeze+stop
1178 map { $proc{$_} = {} } @_;
1181 killem (keys %proc);
1182 select (undef, undef, undef, 0.1);
1184 while (($died = waitpid (-1, WNOHANG)) > 0)
1186 delete $proc{$died};
1201 Log (undef, "Freeze not implemented");
1208 croak ("Thaw not implemented");
1212 Log (undef, "thaw from $key");
1217 @jobstep_tomerge = ();
1218 $jobstep_tomerge_level = 0;
1221 my $stream = new Warehouse::Stream ( whc => $whc,
1222 hash => [split (",", $key)] );
1224 while (my $dataref = $stream->read_until (undef, "\n\n"))
1226 if ($$dataref =~ /^job /)
1228 foreach (split ("\n", $$dataref))
1230 my ($k, $v) = split ("=", $_, 2);
1231 $frozenjob->{$k} = freezeunquote ($v);
1236 if ($$dataref =~ /^merge (\d+) (.*)/)
1238 $jobstep_tomerge_level = $1;
1240 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1245 foreach (split ("\n", $$dataref))
1247 my ($k, $v) = split ("=", $_, 2);
1248 $Jobstep->{$k} = freezeunquote ($v) if $k;
1250 $Jobstep->{'failures'} = 0;
1251 push @jobstep, $Jobstep;
1253 if ($Jobstep->{exitcode} eq "0")
1255 push @jobstep_done, $#jobstep;
1259 push @jobstep_todo, $#jobstep;
1263 foreach (qw (script script_version script_parameters))
1265 $Job->{$_} = $frozenjob->{$_};
1267 $Job->save if $job_has_uuid;
1283 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1290 my $srunargs = shift;
1291 my $execargs = shift;
1292 my $opts = shift || {};
1294 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1295 print STDERR (join (" ",
1296 map { / / ? "'$_'" : $_ }
1299 if $ENV{CRUNCH_DEBUG};
1301 if (defined $stdin) {
1302 my $child = open STDIN, "-|";
1303 defined $child or die "no fork: $!";
1305 print $stdin or die $!;
1306 close STDOUT or die $!;
1311 return system (@$args) if $opts->{fork};
1314 warn "ENV size is ".length(join(" ",%ENV));
1315 die "exec failed: $!: @$args";
1319 sub ban_node_by_slot {
1320 # Don't start any new jobsteps on this node for 60 seconds
1322 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1323 $slot[$slotid]->{node}->{hold_count}++;
1324 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1330 # checkout-and-build
1334 my $destdir = $ENV{"CRUNCH_SRC"};
1335 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1336 my $repo = $ENV{"CRUNCH_SRC_URL"};
1338 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1340 if (readlink ("$destdir.commit") eq $commit) {
1344 unlink "$destdir.commit";
1345 open STDOUT, ">", "$destdir.log";
1346 open STDERR, ">&STDOUT";
1349 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1352 die "'tar -C $destdir -xf -' exited $?: $!";
1356 chomp ($pwd = `pwd`);
1357 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1359 if (-e "$destdir/crunch_scripts/install") {
1360 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1361 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1363 shell_or_die ("./tests/autotests.sh", $install_dir);
1364 } elsif (-e "./install.sh") {
1365 shell_or_die ("./install.sh", $install_dir);
1369 unlink "$destdir.commit.new";
1370 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1371 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1380 if ($ENV{"DEBUG"}) {
1381 print STDERR "@_\n";
1384 or die "@_ failed: $! exit 0x".sprintf("%x",$?);