2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
95 GetOptions('force-unlock' => \$force_unlock,
96 'git-dir=s' => \$git_dir,
98 'job-api-token=s' => \$job_api_token,
99 'resume-stash=s' => \$resume_stash,
102 if (defined $job_api_token) {
103 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
113 $main::ENV{CRUNCH_DEBUG} = 1;
117 $main::ENV{CRUNCH_DEBUG} = 0;
122 my $arv = Arvados->new('apiVersion' => 'v1');
125 my $User = $arv->{'users'}->{'current'}->execute;
133 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
134 if (!$force_unlock) {
135 if ($Job->{'is_locked_by_uuid'}) {
136 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138 if ($Job->{'success'} ne undef) {
139 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141 if ($Job->{'running'}) {
142 croak("Job 'running' flag is already set");
144 if ($Job->{'started_at'}) {
145 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
151 $Job = JSON::decode_json($jobspec);
155 map { croak ("No $_ specified") unless $Job->{$_} }
156 qw(script script_version script_parameters);
159 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
160 $Job->{'started_at'} = gmtime;
162 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
166 $job_id = $Job->{'uuid'};
168 $metastream = Warehouse::Stream->new(whc => new Warehouse);
170 $metastream->name('.');
171 $metastream->write_start($job_id . '.log.txt');
174 $Job->{'runtime_constraints'} ||= {};
175 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
176 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
179 Log (undef, "check slurm allocation");
182 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
186 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
187 push @sinfo, "$localcpus localhost";
189 if (exists $ENV{SLURM_NODELIST})
191 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
195 my ($ncpus, $slurm_nodelist) = split;
196 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
199 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
202 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
205 foreach (split (",", $ranges))
218 push @nodelist, map {
220 $n =~ s/\[[-,\d]+\]/$_/;
227 push @nodelist, $nodelist;
230 foreach my $nodename (@nodelist)
232 Log (undef, "node $nodename - $ncpus slots");
233 my $node = { name => $nodename,
237 foreach my $cpu (1..$ncpus)
239 push @slot, { node => $node,
243 push @node, @nodelist;
248 # Ensure that we get one jobstep running on each allocated node before
249 # we start overloading nodes with concurrent steps
251 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
258 # Claim this job, and make sure nobody else does
259 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
260 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
261 croak("Error while updating / locking job");
263 $Job->update_attributes('started_at' => scalar gmtime,
266 'tasks_summary' => { 'failed' => 0,
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
305 if (defined $Job->{thawedfromkey})
307 thaw ($Job->{thawedfromkey});
311 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312 'job_uuid' => $Job->{'uuid'},
317 push @jobstep, { 'level' => 0,
319 'arvados_task' => $first_task,
321 push @jobstep_todo, 0;
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
333 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
339 $build_script = <DATA>;
341 Log (undef, "Install revision ".$Job->{script_version});
342 my $nodelist = join(",", @node);
344 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
346 my $cleanpid = fork();
349 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
355 last if $cleanpid == waitpid (-1, WNOHANG);
356 freeze_if_want_freeze ($cleanpid);
357 select (undef, undef, undef, 0.1);
359 Log (undef, "Clean-work-dir exited $?");
361 # Install requested code version
364 my @srunargs = ("srun",
365 "--nodelist=$nodelist",
366 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
368 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
374 my $treeish = $Job->{'script_version'};
375 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376 # Todo: let script_version specify repository instead of expecting
377 # parent process to figure it out.
378 $ENV{"CRUNCH_SRC_URL"} = $repo;
380 # Create/update our clone of the remote git repo
382 if (!-d $ENV{"CRUNCH_SRC"}) {
383 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384 or croak ("git clone $repo failed: exit ".($?>>8));
385 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
387 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
389 # If this looks like a subversion r#, look for it in git-svn commit messages
391 if ($treeish =~ m{^\d{1,4}$}) {
392 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
394 if ($gitlog =~ /^[a-f0-9]{40}$/) {
396 Log (undef, "Using commit $commit for script_version $treeish");
400 # If that didn't work, try asking git to look it up as a tree-ish.
402 if (!defined $commit) {
404 my $cooked_treeish = $treeish;
405 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406 # Looks like a git branch name -- make sure git knows it's
407 # relative to the remote repo
408 $cooked_treeish = "origin/$treeish";
411 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
413 if ($found =~ /^[0-9a-f]{40}$/s) {
415 if ($commit ne $treeish) {
416 # Make sure we record the real commit id in the database,
417 # frozentokey, logs, etc. -- instead of an abbreviation or a
418 # branch name which can become ambiguous or point to a
419 # different commit in the future.
420 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421 Log (undef, "Using commit $commit for tree-ish $treeish");
422 if ($commit ne $treeish) {
423 $Job->{'script_version'} = $commit;
425 $Job->update_attributes('script_version' => $commit) or
426 croak("Error while updating job");
432 if (defined $commit) {
433 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
434 @execargs = ("sh", "-c",
435 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
436 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
439 croak ("could not figure out commit id for $treeish");
442 my $installpid = fork();
443 if ($installpid == 0)
445 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
450 last if $installpid == waitpid (-1, WNOHANG);
451 freeze_if_want_freeze ($installpid);
452 select (undef, undef, undef, 0.1);
454 Log (undef, "Install exited $?");
459 foreach (qw (script script_version script_parameters runtime_constraints))
463 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
465 foreach (split (/\n/, $Job->{knobs}))
467 Log (undef, "knob " . $_);
472 $main::success = undef;
478 my $thisround_succeeded = 0;
479 my $thisround_failed = 0;
480 my $thisround_failed_multiple = 0;
482 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
483 or $a <=> $b } @jobstep_todo;
484 my $level = $jobstep[$jobstep_todo[0]]->{level};
485 Log (undef, "start level $level");
490 my @freeslot = (0..$#slot);
493 my $progress_is_dirty = 1;
494 my $progress_stats_updated = 0;
496 update_progress_stats();
501 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
503 my $id = $jobstep_todo[$todo_ptr];
504 my $Jobstep = $jobstep[$id];
505 if ($Jobstep->{level} != $level)
510 pipe $reader{$id}, "writer" or croak ($!);
511 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
512 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
514 my $childslot = $freeslot[0];
515 my $childnode = $slot[$childslot]->{node};
516 my $childslotname = join (".",
517 $slot[$childslot]->{node}->{name},
518 $slot[$childslot]->{cpu});
519 my $childpid = fork();
522 $SIG{'INT'} = 'DEFAULT';
523 $SIG{'QUIT'} = 'DEFAULT';
524 $SIG{'TERM'} = 'DEFAULT';
526 foreach (values (%reader))
530 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
531 open(STDOUT,">&writer");
532 open(STDERR,">&writer");
537 delete $ENV{"GNUPGHOME"};
538 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
539 $ENV{"TASK_QSEQUENCE"} = $id;
540 $ENV{"TASK_SEQUENCE"} = $level;
541 $ENV{"JOB_SCRIPT"} = $Job->{script};
542 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
543 $param =~ tr/a-z/A-Z/;
544 $ENV{"JOB_PARAMETER_$param"} = $value;
546 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
547 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
548 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
549 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
550 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
556 "--nodelist=".$childnode->{name},
557 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
558 "--job-name=$job_id.$id.$$",
560 my @execargs = qw(sh);
561 my $build_script_to_send = "";
563 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
564 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
565 ."&& cd $ENV{CRUNCH_TMP} ";
568 $build_script_to_send = $build_script;
572 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
573 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
574 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
575 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
577 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
578 my @execargs = ('bash', '-c', $command);
579 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
583 if (!defined $childpid)
590 $proc{$childpid} = { jobstep => $id,
593 jobstepname => "$job_id.$id.$childpid",
595 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
596 $slot[$childslot]->{pid} = $childpid;
598 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
599 Log ($id, "child $childpid started on $childslotname");
600 $Jobstep->{starttime} = time;
601 $Jobstep->{node} = $childnode->{name};
602 $Jobstep->{slotindex} = $childslot;
603 delete $Jobstep->{stderr};
604 delete $Jobstep->{finishtime};
606 splice @jobstep_todo, $todo_ptr, 1;
609 $progress_is_dirty = 1;
613 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
615 last THISROUND if $main::please_freeze;
616 if ($main::please_info)
618 $main::please_info = 0;
622 update_progress_stats();
629 check_refresh_wanted();
631 update_progress_stats();
632 select (undef, undef, undef, 0.1);
634 elsif (time - $progress_stats_updated >= 30)
636 update_progress_stats();
638 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
639 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
641 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
642 .($thisround_failed+$thisround_succeeded)
643 .") -- giving up on this round";
644 Log (undef, $message);
648 # move slots from freeslot to holdslot (or back to freeslot) if necessary
649 for (my $i=$#freeslot; $i>=0; $i--) {
650 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
651 push @holdslot, (splice @freeslot, $i, 1);
654 for (my $i=$#holdslot; $i>=0; $i--) {
655 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
656 push @freeslot, (splice @holdslot, $i, 1);
660 # give up if no nodes are succeeding
661 if (!grep { $_->{node}->{losing_streak} == 0 &&
662 $_->{node}->{hold_count} < 4 } @slot) {
663 my $message = "Every node has failed -- giving up on this round";
664 Log (undef, $message);
671 push @freeslot, splice @holdslot;
672 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
675 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
678 if ($main::please_continue) {
679 $main::please_continue = 0;
682 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
686 check_refresh_wanted();
688 update_progress_stats();
689 select (undef, undef, undef, 0.1);
690 killem (keys %proc) if $main::please_freeze;
694 update_progress_stats();
695 freeze_if_want_freeze();
698 if (!defined $main::success)
701 $thisround_succeeded == 0 &&
702 ($thisround_failed == 0 || $thisround_failed > 4))
704 my $message = "stop because $thisround_failed tasks failed and none succeeded";
705 Log (undef, $message);
714 goto ONELEVEL if !defined $main::success;
717 release_allocation();
720 $Job->update_attributes('output' => &collate_output(),
722 'success' => $Job->{'output'} && $main::success,
723 'finished_at' => scalar gmtime)
726 if ($Job->{'output'})
729 my $manifest_text = capturex("whget", $Job->{'output'});
730 $arv->{'collections'}->{'create'}->execute('collection' => {
731 'uuid' => $Job->{'output'},
732 'manifest_text' => $manifest_text,
736 Log (undef, "Failed to register output manifest: $@");
740 Log (undef, "finish");
747 sub update_progress_stats
749 $progress_stats_updated = time;
750 return if !$progress_is_dirty;
751 my ($todo, $done, $running) = (scalar @jobstep_todo,
752 scalar @jobstep_done,
753 scalar @slot - scalar @freeslot - scalar @holdslot);
754 $Job->{'tasks_summary'} ||= {};
755 $Job->{'tasks_summary'}->{'todo'} = $todo;
756 $Job->{'tasks_summary'}->{'done'} = $done;
757 $Job->{'tasks_summary'}->{'running'} = $running;
759 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
761 Log (undef, "status: $done done, $running running, $todo todo");
762 $progress_is_dirty = 0;
769 my $pid = waitpid (-1, WNOHANG);
770 return 0 if $pid <= 0;
772 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
774 . $slot[$proc{$pid}->{slot}]->{cpu});
775 my $jobstepid = $proc{$pid}->{jobstep};
776 my $elapsed = time - $proc{$pid}->{time};
777 my $Jobstep = $jobstep[$jobstepid];
779 my $childstatus = $?;
780 my $exitvalue = $childstatus >> 8;
781 my $exitinfo = sprintf("exit %d signal %d%s",
784 ($childstatus & 128 ? ' core dump' : ''));
785 $Jobstep->{'arvados_task'}->reload;
786 my $task_success = $Jobstep->{'arvados_task'}->{success};
788 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
790 if (!defined $task_success) {
791 # task did not indicate one way or the other --> fail
792 $Jobstep->{'arvados_task'}->{success} = 0;
793 $Jobstep->{'arvados_task'}->save;
800 $temporary_fail ||= $Jobstep->{node_fail};
801 $temporary_fail ||= ($exitvalue == 111);
804 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
806 # Check for signs of a failed or misconfigured node
807 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
808 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
809 # Don't count this against jobstep failure thresholds if this
810 # node is already suspected faulty and srun exited quickly
811 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
813 Log ($jobstepid, "blaming failure on suspect node " .
814 $slot[$proc{$pid}->{slot}]->{node}->{name});
815 $temporary_fail ||= 1;
817 ban_node_by_slot($proc{$pid}->{slot});
820 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
821 ++$Jobstep->{'failures'},
822 $temporary_fail ? 'temporary ' : 'permanent',
825 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
826 # Give up on this task, and the whole job
828 $main::please_freeze = 1;
831 # Put this task back on the todo queue
832 push @jobstep_todo, $jobstepid;
834 $Job->{'tasks_summary'}->{'failed'}++;
838 ++$thisround_succeeded;
839 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
840 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
841 push @jobstep_done, $jobstepid;
842 Log ($jobstepid, "success in $elapsed seconds");
844 $Jobstep->{exitcode} = $childstatus;
845 $Jobstep->{finishtime} = time;
846 process_stderr ($jobstepid, $task_success);
847 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
849 close $reader{$jobstepid};
850 delete $reader{$jobstepid};
851 delete $slot[$proc{$pid}->{slot}]->{pid};
852 push @freeslot, $proc{$pid}->{slot};
856 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
858 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
860 'order' => 'qsequence'
862 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
864 'level' => $arvados_task->{'sequence'},
866 'arvados_task' => $arvados_task
868 push @jobstep, $jobstep;
869 push @jobstep_todo, $#jobstep;
872 $progress_is_dirty = 1;
876 sub check_refresh_wanted
878 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
879 if (@stat && $stat[9] > $latest_refresh) {
880 $latest_refresh = scalar time;
882 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
883 for my $attr ('cancelled_at',
884 'cancelled_by_user_uuid',
885 'cancelled_by_client_uuid') {
886 $Job->{$attr} = $Job2->{$attr};
888 if ($Job->{'cancelled_at'}) {
889 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
890 " by user " . $Job->{cancelled_by_user_uuid});
892 $main::please_freeze = 1;
900 # return if the kill list was checked <4 seconds ago
901 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
905 $squeue_kill_checked = time;
907 # use killem() on procs whose killtime is reached
910 if (exists $proc{$_}->{killtime}
911 && $proc{$_}->{killtime} <= time)
917 # return if the squeue was checked <60 seconds ago
918 if (defined $squeue_checked && $squeue_checked > time - 60)
922 $squeue_checked = time;
926 # here is an opportunity to check for mysterious problems with local procs
930 # get a list of steps still running
931 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
933 if ($squeue[-1] ne "ok")
939 # which of my jobsteps are running, according to squeue?
943 if (/^(\d+)\.(\d+) (\S+)/)
945 if ($1 eq $ENV{SLURM_JOBID})
952 # which of my active child procs (>60s old) were not mentioned by squeue?
955 if ($proc{$_}->{time} < time - 60
956 && !exists $ok{$proc{$_}->{jobstepname}}
957 && !exists $proc{$_}->{killtime})
959 # kill this proc if it hasn't exited in 30 seconds
960 $proc{$_}->{killtime} = time + 30;
966 sub release_allocation
970 Log (undef, "release job allocation");
971 system "scancel $ENV{SLURM_JOBID}";
979 foreach my $job (keys %reader)
982 while (0 < sysread ($reader{$job}, $buf, 8192))
984 print STDERR $buf if $ENV{CRUNCH_DEBUG};
985 $jobstep[$job]->{stderr} .= $buf;
986 preprocess_stderr ($job);
987 if (length ($jobstep[$job]->{stderr}) > 16384)
989 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
998 sub preprocess_stderr
1002 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1004 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1005 Log ($job, "stderr $line");
1006 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1008 $main::please_freeze = 1;
1010 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1011 $jobstep[$job]->{node_fail} = 1;
1012 ban_node_by_slot($jobstep[$job]->{slotindex});
1021 my $task_success = shift;
1022 preprocess_stderr ($job);
1025 Log ($job, "stderr $_");
1026 } split ("\n", $jobstep[$job]->{stderr});
1032 my $whc = Warehouse->new;
1033 Log (undef, "collate");
1034 $whc->write_start (1);
1038 next if (!exists $_->{'arvados_task'}->{output} ||
1039 !$_->{'arvados_task'}->{'success'} ||
1040 $_->{'exitcode'} != 0);
1041 my $output = $_->{'arvados_task'}->{output};
1042 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1044 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1045 $whc->write_data ($output);
1047 elsif (@jobstep == 1)
1049 $joboutput = $output;
1052 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1054 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1055 $whc->write_data ($outblock);
1059 my $errstr = $whc->errstr;
1060 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1064 $joboutput = $whc->write_finish if !defined $joboutput;
1067 Log (undef, "output $joboutput");
1068 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1072 Log (undef, "output undef");
1082 my $sig = 2; # SIGINT first
1083 if (exists $proc{$_}->{"sent_$sig"} &&
1084 time - $proc{$_}->{"sent_$sig"} > 4)
1086 $sig = 15; # SIGTERM if SIGINT doesn't work
1088 if (exists $proc{$_}->{"sent_$sig"} &&
1089 time - $proc{$_}->{"sent_$sig"} > 4)
1091 $sig = 9; # SIGKILL if SIGTERM doesn't work
1093 if (!exists $proc{$_}->{"sent_$sig"})
1095 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1097 select (undef, undef, undef, 0.1);
1100 kill $sig, $_; # srun wants two SIGINT to really interrupt
1102 $proc{$_}->{"sent_$sig"} = time;
1103 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1113 vec($bits,fileno($_),1) = 1;
1119 sub Log # ($jobstep_id, $logmessage)
1121 if ($_[1] =~ /\n/) {
1122 for my $line (split (/\n/, $_[1])) {
1127 my $fh = select STDERR; $|=1; select $fh;
1128 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1129 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1132 if ($metastream || -t STDERR) {
1133 my @gmtime = gmtime;
1134 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1135 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1137 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1139 return if !$metastream;
1140 $metastream->write_data ($datetime . " " . $message);
1146 my ($package, $file, $line) = caller;
1147 my $message = "@_ at $file line $line\n";
1148 Log (undef, $message);
1149 freeze() if @jobstep_todo;
1150 collate_output() if @jobstep_todo;
1152 save_meta() if $metastream;
1159 return if !$job_has_uuid;
1160 $Job->update_attributes('running' => 0,
1162 'finished_at' => scalar gmtime);
1168 my $justcheckpoint = shift; # false if this will be the last meta saved
1169 my $m = $metastream;
1170 $m = $m->copy if $justcheckpoint;
1172 my $whc = Warehouse->new;
1173 my $loglocator = $whc->store_block ($m->as_string);
1174 $arv->{'collections'}->{'create'}->execute('collection' => {
1175 'uuid' => $loglocator,
1176 'manifest_text' => $m->as_string,
1178 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1179 Log (undef, "log manifest is $loglocator");
1180 $Job->{'log'} = $loglocator;
1181 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1185 sub freeze_if_want_freeze
1187 if ($main::please_freeze)
1189 release_allocation();
1192 # kill some srun procs before freeze+stop
1193 map { $proc{$_} = {} } @_;
1196 killem (keys %proc);
1197 select (undef, undef, undef, 0.1);
1199 while (($died = waitpid (-1, WNOHANG)) > 0)
1201 delete $proc{$died};
1216 Log (undef, "Freeze not implemented");
1223 croak ("Thaw not implemented");
1227 Log (undef, "thaw from $key");
1232 @jobstep_tomerge = ();
1233 $jobstep_tomerge_level = 0;
1236 my $stream = new Warehouse::Stream ( whc => $whc,
1237 hash => [split (",", $key)] );
1239 while (my $dataref = $stream->read_until (undef, "\n\n"))
1241 if ($$dataref =~ /^job /)
1243 foreach (split ("\n", $$dataref))
1245 my ($k, $v) = split ("=", $_, 2);
1246 $frozenjob->{$k} = freezeunquote ($v);
1251 if ($$dataref =~ /^merge (\d+) (.*)/)
1253 $jobstep_tomerge_level = $1;
1255 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1260 foreach (split ("\n", $$dataref))
1262 my ($k, $v) = split ("=", $_, 2);
1263 $Jobstep->{$k} = freezeunquote ($v) if $k;
1265 $Jobstep->{'failures'} = 0;
1266 push @jobstep, $Jobstep;
1268 if ($Jobstep->{exitcode} eq "0")
1270 push @jobstep_done, $#jobstep;
1274 push @jobstep_todo, $#jobstep;
1278 foreach (qw (script script_version script_parameters))
1280 $Job->{$_} = $frozenjob->{$_};
1282 $Job->save if $job_has_uuid;
1298 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1305 my $srunargs = shift;
1306 my $execargs = shift;
1307 my $opts = shift || {};
1309 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1310 print STDERR (join (" ",
1311 map { / / ? "'$_'" : $_ }
1314 if $ENV{CRUNCH_DEBUG};
1316 if (defined $stdin) {
1317 my $child = open STDIN, "-|";
1318 defined $child or die "no fork: $!";
1320 print $stdin or die $!;
1321 close STDOUT or die $!;
1326 return system (@$args) if $opts->{fork};
1329 warn "ENV size is ".length(join(" ",%ENV));
1330 die "exec failed: $!: @$args";
1334 sub ban_node_by_slot {
1335 # Don't start any new jobsteps on this node for 60 seconds
1337 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1338 $slot[$slotid]->{node}->{hold_count}++;
1339 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1345 # checkout-and-build
1349 my $destdir = $ENV{"CRUNCH_SRC"};
1350 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1351 my $repo = $ENV{"CRUNCH_SRC_URL"};
1353 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1355 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1359 unlink "$destdir.commit";
1360 open STDOUT, ">", "$destdir.log";
1361 open STDERR, ">&STDOUT";
1364 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1367 die "'tar -C $destdir -xf -' exited $?: $!";
1371 chomp ($pwd = `pwd`);
1372 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1374 if (-e "$destdir/crunch_scripts/install") {
1375 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1376 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1378 shell_or_die ("./tests/autotests.sh", $install_dir);
1379 } elsif (-e "./install.sh") {
1380 shell_or_die ("./install.sh", $install_dir);
1384 unlink "$destdir.commit.new";
1385 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1386 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1395 if ($ENV{"DEBUG"}) {
1396 print STDERR "@_\n";
1399 or die "@_ failed: $! exit 0x".sprintf("%x",$?);