2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
96 GetOptions('force-unlock' => \$force_unlock,
97 'git-dir=s' => \$git_dir,
99 'job-api-token=s' => \$job_api_token,
100 'resume-stash=s' => \$resume_stash,
103 if (defined $job_api_token) {
104 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
114 $main::ENV{CRUNCH_DEBUG} = 1;
118 $main::ENV{CRUNCH_DEBUG} = 0;
123 my $arv = Arvados->new('apiVersion' => 'v1');
126 my $User = $arv->{'users'}->{'current'}->execute;
134 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135 if (!$force_unlock) {
136 if ($Job->{'is_locked_by_uuid'}) {
137 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139 if ($Job->{'success'} ne undef) {
140 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142 if ($Job->{'running'}) {
143 croak("Job 'running' flag is already set");
145 if ($Job->{'started_at'}) {
146 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
152 $Job = JSON::decode_json($jobspec);
156 map { croak ("No $_ specified") unless $Job->{$_} }
157 qw(script script_version script_parameters);
160 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161 $Job->{'started_at'} = gmtime;
163 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
167 $job_id = $Job->{'uuid'};
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
180 Log (undef, "check slurm allocation");
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
187 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188 push @sinfo, "$localcpus localhost";
190 if (exists $ENV{SLURM_NODELIST})
192 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
196 my ($ncpus, $slurm_nodelist) = split;
197 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
200 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
203 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
206 foreach (split (",", $ranges))
219 push @nodelist, map {
221 $n =~ s/\[[-,\d]+\]/$_/;
228 push @nodelist, $nodelist;
231 foreach my $nodename (@nodelist)
233 Log (undef, "node $nodename - $ncpus slots");
234 my $node = { name => $nodename,
238 foreach my $cpu (1..$ncpus)
240 push @slot, { node => $node,
244 push @node, @nodelist;
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259 # Claim this job, and make sure nobody else does
260 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262 croak("Error while updating / locking job");
264 $Job->update_attributes('started_at' => scalar gmtime,
267 'tasks_summary' => { 'failed' => 0,
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
306 if (defined $Job->{thawedfromkey})
308 thaw ($Job->{thawedfromkey});
312 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313 'job_uuid' => $Job->{'uuid'},
318 push @jobstep, { 'level' => 0,
320 'arvados_task' => $first_task,
322 push @jobstep_todo, 0;
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
334 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
337 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
338 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
339 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
341 or croak ("setup.py in $src_path failed: exit ".($?>>8));
349 $build_script = <DATA>;
351 Log (undef, "Install revision ".$Job->{script_version});
352 my $nodelist = join(",", @node);
354 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356 my $cleanpid = fork();
359 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
360 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
365 last if $cleanpid == waitpid (-1, WNOHANG);
366 freeze_if_want_freeze ($cleanpid);
367 select (undef, undef, undef, 0.1);
369 Log (undef, "Clean-work-dir exited $?");
371 # Install requested code version
374 my @srunargs = ("srun",
375 "--nodelist=$nodelist",
376 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
378 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
379 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383 my $treeish = $Job->{'script_version'};
384 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
385 # Todo: let script_version specify repository instead of expecting
386 # parent process to figure it out.
387 $ENV{"CRUNCH_SRC_URL"} = $repo;
389 # Create/update our clone of the remote git repo
391 if (!-d $ENV{"CRUNCH_SRC"}) {
392 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
393 or croak ("git clone $repo failed: exit ".($?>>8));
394 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
396 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
398 # If this looks like a subversion r#, look for it in git-svn commit messages
400 if ($treeish =~ m{^\d{1,4}$}) {
401 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
403 if ($gitlog =~ /^[a-f0-9]{40}$/) {
405 Log (undef, "Using commit $commit for script_version $treeish");
409 # If that didn't work, try asking git to look it up as a tree-ish.
411 if (!defined $commit) {
413 my $cooked_treeish = $treeish;
414 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
415 # Looks like a git branch name -- make sure git knows it's
416 # relative to the remote repo
417 $cooked_treeish = "origin/$treeish";
420 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
422 if ($found =~ /^[0-9a-f]{40}$/s) {
424 if ($commit ne $treeish) {
425 # Make sure we record the real commit id in the database,
426 # frozentokey, logs, etc. -- instead of an abbreviation or a
427 # branch name which can become ambiguous or point to a
428 # different commit in the future.
429 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430 Log (undef, "Using commit $commit for tree-ish $treeish");
431 if ($commit ne $treeish) {
432 $Job->{'script_version'} = $commit;
434 $Job->update_attributes('script_version' => $commit) or
435 croak("Error while updating job");
441 if (defined $commit) {
442 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
443 @execargs = ("sh", "-c",
444 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
445 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
448 croak ("could not figure out commit id for $treeish");
451 my $installpid = fork();
452 if ($installpid == 0)
454 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
459 last if $installpid == waitpid (-1, WNOHANG);
460 freeze_if_want_freeze ($installpid);
461 select (undef, undef, undef, 0.1);
463 Log (undef, "Install exited $?");
468 foreach (qw (script script_version script_parameters runtime_constraints))
472 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
474 foreach (split (/\n/, $Job->{knobs}))
476 Log (undef, "knob " . $_);
481 $main::success = undef;
487 my $thisround_succeeded = 0;
488 my $thisround_failed = 0;
489 my $thisround_failed_multiple = 0;
491 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
492 or $a <=> $b } @jobstep_todo;
493 my $level = $jobstep[$jobstep_todo[0]]->{level};
494 Log (undef, "start level $level");
499 my @freeslot = (0..$#slot);
502 my $progress_is_dirty = 1;
503 my $progress_stats_updated = 0;
505 update_progress_stats();
510 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
512 my $id = $jobstep_todo[$todo_ptr];
513 my $Jobstep = $jobstep[$id];
514 if ($Jobstep->{level} != $level)
519 pipe $reader{$id}, "writer" or croak ($!);
520 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
521 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
523 my $childslot = $freeslot[0];
524 my $childnode = $slot[$childslot]->{node};
525 my $childslotname = join (".",
526 $slot[$childslot]->{node}->{name},
527 $slot[$childslot]->{cpu});
528 my $childpid = fork();
531 $SIG{'INT'} = 'DEFAULT';
532 $SIG{'QUIT'} = 'DEFAULT';
533 $SIG{'TERM'} = 'DEFAULT';
535 foreach (values (%reader))
539 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
540 open(STDOUT,">&writer");
541 open(STDERR,">&writer");
546 delete $ENV{"GNUPGHOME"};
547 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
548 $ENV{"TASK_QSEQUENCE"} = $id;
549 $ENV{"TASK_SEQUENCE"} = $level;
550 $ENV{"JOB_SCRIPT"} = $Job->{script};
551 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
552 $param =~ tr/a-z/A-Z/;
553 $ENV{"JOB_PARAMETER_$param"} = $value;
555 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
556 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
557 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
558 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
559 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
560 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
561 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
567 "--nodelist=".$childnode->{name},
568 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
569 "--job-name=$job_id.$id.$$",
571 my @execargs = qw(sh);
572 my $build_script_to_send = "";
574 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
575 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
576 ."&& cd $ENV{CRUNCH_TMP} ";
579 $build_script_to_send = $build_script;
583 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
584 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
585 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
586 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
588 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
589 my @execargs = ('bash', '-c', $command);
590 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
594 if (!defined $childpid)
601 $proc{$childpid} = { jobstep => $id,
604 jobstepname => "$job_id.$id.$childpid",
606 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
607 $slot[$childslot]->{pid} = $childpid;
609 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
610 Log ($id, "child $childpid started on $childslotname");
611 $Jobstep->{starttime} = time;
612 $Jobstep->{node} = $childnode->{name};
613 $Jobstep->{slotindex} = $childslot;
614 delete $Jobstep->{stderr};
615 delete $Jobstep->{finishtime};
617 splice @jobstep_todo, $todo_ptr, 1;
620 $progress_is_dirty = 1;
624 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
626 last THISROUND if $main::please_freeze;
627 if ($main::please_info)
629 $main::please_info = 0;
633 update_progress_stats();
640 check_refresh_wanted();
642 update_progress_stats();
643 select (undef, undef, undef, 0.1);
645 elsif (time - $progress_stats_updated >= 30)
647 update_progress_stats();
649 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
650 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
652 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
653 .($thisround_failed+$thisround_succeeded)
654 .") -- giving up on this round";
655 Log (undef, $message);
659 # move slots from freeslot to holdslot (or back to freeslot) if necessary
660 for (my $i=$#freeslot; $i>=0; $i--) {
661 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
662 push @holdslot, (splice @freeslot, $i, 1);
665 for (my $i=$#holdslot; $i>=0; $i--) {
666 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
667 push @freeslot, (splice @holdslot, $i, 1);
671 # give up if no nodes are succeeding
672 if (!grep { $_->{node}->{losing_streak} == 0 &&
673 $_->{node}->{hold_count} < 4 } @slot) {
674 my $message = "Every node has failed -- giving up on this round";
675 Log (undef, $message);
682 push @freeslot, splice @holdslot;
683 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
686 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
689 if ($main::please_continue) {
690 $main::please_continue = 0;
693 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
697 check_refresh_wanted();
699 update_progress_stats();
700 select (undef, undef, undef, 0.1);
701 killem (keys %proc) if $main::please_freeze;
705 update_progress_stats();
706 freeze_if_want_freeze();
709 if (!defined $main::success)
712 $thisround_succeeded == 0 &&
713 ($thisround_failed == 0 || $thisround_failed > 4))
715 my $message = "stop because $thisround_failed tasks failed and none succeeded";
716 Log (undef, $message);
725 goto ONELEVEL if !defined $main::success;
728 release_allocation();
731 $Job->update_attributes('output' => &collate_output(),
733 'success' => $Job->{'output'} && $main::success,
734 'finished_at' => scalar gmtime)
737 if ($Job->{'output'})
740 my $manifest_text = capturex("whget", $Job->{'output'});
741 $arv->{'collections'}->{'create'}->execute('collection' => {
742 'uuid' => $Job->{'output'},
743 'manifest_text' => $manifest_text,
747 Log (undef, "Failed to register output manifest: $@");
751 Log (undef, "finish");
758 sub update_progress_stats
760 $progress_stats_updated = time;
761 return if !$progress_is_dirty;
762 my ($todo, $done, $running) = (scalar @jobstep_todo,
763 scalar @jobstep_done,
764 scalar @slot - scalar @freeslot - scalar @holdslot);
765 $Job->{'tasks_summary'} ||= {};
766 $Job->{'tasks_summary'}->{'todo'} = $todo;
767 $Job->{'tasks_summary'}->{'done'} = $done;
768 $Job->{'tasks_summary'}->{'running'} = $running;
770 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
772 Log (undef, "status: $done done, $running running, $todo todo");
773 $progress_is_dirty = 0;
780 my $pid = waitpid (-1, WNOHANG);
781 return 0 if $pid <= 0;
783 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
785 . $slot[$proc{$pid}->{slot}]->{cpu});
786 my $jobstepid = $proc{$pid}->{jobstep};
787 my $elapsed = time - $proc{$pid}->{time};
788 my $Jobstep = $jobstep[$jobstepid];
790 my $childstatus = $?;
791 my $exitvalue = $childstatus >> 8;
792 my $exitinfo = sprintf("exit %d signal %d%s",
795 ($childstatus & 128 ? ' core dump' : ''));
796 $Jobstep->{'arvados_task'}->reload;
797 my $task_success = $Jobstep->{'arvados_task'}->{success};
799 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
801 if (!defined $task_success) {
802 # task did not indicate one way or the other --> fail
803 $Jobstep->{'arvados_task'}->{success} = 0;
804 $Jobstep->{'arvados_task'}->save;
811 $temporary_fail ||= $Jobstep->{node_fail};
812 $temporary_fail ||= ($exitvalue == 111);
815 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
817 # Check for signs of a failed or misconfigured node
818 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
819 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
820 # Don't count this against jobstep failure thresholds if this
821 # node is already suspected faulty and srun exited quickly
822 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
824 Log ($jobstepid, "blaming failure on suspect node " .
825 $slot[$proc{$pid}->{slot}]->{node}->{name});
826 $temporary_fail ||= 1;
828 ban_node_by_slot($proc{$pid}->{slot});
831 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
832 ++$Jobstep->{'failures'},
833 $temporary_fail ? 'temporary ' : 'permanent',
836 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
837 # Give up on this task, and the whole job
839 $main::please_freeze = 1;
842 # Put this task back on the todo queue
843 push @jobstep_todo, $jobstepid;
845 $Job->{'tasks_summary'}->{'failed'}++;
849 ++$thisround_succeeded;
850 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
851 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
852 push @jobstep_done, $jobstepid;
853 Log ($jobstepid, "success in $elapsed seconds");
855 $Jobstep->{exitcode} = $childstatus;
856 $Jobstep->{finishtime} = time;
857 process_stderr ($jobstepid, $task_success);
858 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
860 close $reader{$jobstepid};
861 delete $reader{$jobstepid};
862 delete $slot[$proc{$pid}->{slot}]->{pid};
863 push @freeslot, $proc{$pid}->{slot};
867 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
869 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
871 'order' => 'qsequence'
873 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
875 'level' => $arvados_task->{'sequence'},
877 'arvados_task' => $arvados_task
879 push @jobstep, $jobstep;
880 push @jobstep_todo, $#jobstep;
883 $progress_is_dirty = 1;
887 sub check_refresh_wanted
889 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
890 if (@stat && $stat[9] > $latest_refresh) {
891 $latest_refresh = scalar time;
893 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
894 for my $attr ('cancelled_at',
895 'cancelled_by_user_uuid',
896 'cancelled_by_client_uuid') {
897 $Job->{$attr} = $Job2->{$attr};
899 if ($Job->{'cancelled_at'}) {
900 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
901 " by user " . $Job->{cancelled_by_user_uuid});
903 $main::please_freeze = 1;
911 # return if the kill list was checked <4 seconds ago
912 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
916 $squeue_kill_checked = time;
918 # use killem() on procs whose killtime is reached
921 if (exists $proc{$_}->{killtime}
922 && $proc{$_}->{killtime} <= time)
928 # return if the squeue was checked <60 seconds ago
929 if (defined $squeue_checked && $squeue_checked > time - 60)
933 $squeue_checked = time;
937 # here is an opportunity to check for mysterious problems with local procs
941 # get a list of steps still running
942 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
944 if ($squeue[-1] ne "ok")
950 # which of my jobsteps are running, according to squeue?
954 if (/^(\d+)\.(\d+) (\S+)/)
956 if ($1 eq $ENV{SLURM_JOBID})
963 # which of my active child procs (>60s old) were not mentioned by squeue?
966 if ($proc{$_}->{time} < time - 60
967 && !exists $ok{$proc{$_}->{jobstepname}}
968 && !exists $proc{$_}->{killtime})
970 # kill this proc if it hasn't exited in 30 seconds
971 $proc{$_}->{killtime} = time + 30;
977 sub release_allocation
981 Log (undef, "release job allocation");
982 system "scancel $ENV{SLURM_JOBID}";
990 foreach my $job (keys %reader)
993 while (0 < sysread ($reader{$job}, $buf, 8192))
995 print STDERR $buf if $ENV{CRUNCH_DEBUG};
996 $jobstep[$job]->{stderr} .= $buf;
997 preprocess_stderr ($job);
998 if (length ($jobstep[$job]->{stderr}) > 16384)
1000 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1009 sub preprocess_stderr
1013 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1015 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1016 Log ($job, "stderr $line");
1017 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1019 $main::please_freeze = 1;
1021 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1022 $jobstep[$job]->{node_fail} = 1;
1023 ban_node_by_slot($jobstep[$job]->{slotindex});
1032 my $task_success = shift;
1033 preprocess_stderr ($job);
1036 Log ($job, "stderr $_");
1037 } split ("\n", $jobstep[$job]->{stderr});
1043 my $whc = Warehouse->new;
1044 Log (undef, "collate");
1045 $whc->write_start (1);
1049 next if (!exists $_->{'arvados_task'}->{output} ||
1050 !$_->{'arvados_task'}->{'success'} ||
1051 $_->{'exitcode'} != 0);
1052 my $output = $_->{'arvados_task'}->{output};
1053 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1055 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1056 $whc->write_data ($output);
1058 elsif (@jobstep == 1)
1060 $joboutput = $output;
1063 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1065 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1066 $whc->write_data ($outblock);
1070 my $errstr = $whc->errstr;
1071 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1075 $joboutput = $whc->write_finish if !defined $joboutput;
1078 Log (undef, "output $joboutput");
1079 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1083 Log (undef, "output undef");
1093 my $sig = 2; # SIGINT first
1094 if (exists $proc{$_}->{"sent_$sig"} &&
1095 time - $proc{$_}->{"sent_$sig"} > 4)
1097 $sig = 15; # SIGTERM if SIGINT doesn't work
1099 if (exists $proc{$_}->{"sent_$sig"} &&
1100 time - $proc{$_}->{"sent_$sig"} > 4)
1102 $sig = 9; # SIGKILL if SIGTERM doesn't work
1104 if (!exists $proc{$_}->{"sent_$sig"})
1106 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1108 select (undef, undef, undef, 0.1);
1111 kill $sig, $_; # srun wants two SIGINT to really interrupt
1113 $proc{$_}->{"sent_$sig"} = time;
1114 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1124 vec($bits,fileno($_),1) = 1;
1130 sub Log # ($jobstep_id, $logmessage)
1132 if ($_[1] =~ /\n/) {
1133 for my $line (split (/\n/, $_[1])) {
1138 my $fh = select STDERR; $|=1; select $fh;
1139 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1140 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1143 if ($metastream || -t STDERR) {
1144 my @gmtime = gmtime;
1145 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1146 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1148 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1150 return if !$metastream;
1151 $metastream->write_data ($datetime . " " . $message);
1157 my ($package, $file, $line) = caller;
1158 my $message = "@_ at $file line $line\n";
1159 Log (undef, $message);
1160 freeze() if @jobstep_todo;
1161 collate_output() if @jobstep_todo;
1163 save_meta() if $metastream;
1170 return if !$job_has_uuid;
1171 $Job->update_attributes('running' => 0,
1173 'finished_at' => scalar gmtime);
1179 my $justcheckpoint = shift; # false if this will be the last meta saved
1180 my $m = $metastream;
1181 $m = $m->copy if $justcheckpoint;
1183 my $whc = Warehouse->new;
1184 my $loglocator = $whc->store_block ($m->as_string);
1185 $arv->{'collections'}->{'create'}->execute('collection' => {
1186 'uuid' => $loglocator,
1187 'manifest_text' => $m->as_string,
1189 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1190 Log (undef, "log manifest is $loglocator");
1191 $Job->{'log'} = $loglocator;
1192 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1196 sub freeze_if_want_freeze
1198 if ($main::please_freeze)
1200 release_allocation();
1203 # kill some srun procs before freeze+stop
1204 map { $proc{$_} = {} } @_;
1207 killem (keys %proc);
1208 select (undef, undef, undef, 0.1);
1210 while (($died = waitpid (-1, WNOHANG)) > 0)
1212 delete $proc{$died};
1227 Log (undef, "Freeze not implemented");
1234 croak ("Thaw not implemented");
1238 Log (undef, "thaw from $key");
1243 @jobstep_tomerge = ();
1244 $jobstep_tomerge_level = 0;
1247 my $stream = new Warehouse::Stream ( whc => $whc,
1248 hash => [split (",", $key)] );
1250 while (my $dataref = $stream->read_until (undef, "\n\n"))
1252 if ($$dataref =~ /^job /)
1254 foreach (split ("\n", $$dataref))
1256 my ($k, $v) = split ("=", $_, 2);
1257 $frozenjob->{$k} = freezeunquote ($v);
1262 if ($$dataref =~ /^merge (\d+) (.*)/)
1264 $jobstep_tomerge_level = $1;
1266 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1271 foreach (split ("\n", $$dataref))
1273 my ($k, $v) = split ("=", $_, 2);
1274 $Jobstep->{$k} = freezeunquote ($v) if $k;
1276 $Jobstep->{'failures'} = 0;
1277 push @jobstep, $Jobstep;
1279 if ($Jobstep->{exitcode} eq "0")
1281 push @jobstep_done, $#jobstep;
1285 push @jobstep_todo, $#jobstep;
1289 foreach (qw (script script_version script_parameters))
1291 $Job->{$_} = $frozenjob->{$_};
1293 $Job->save if $job_has_uuid;
1309 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1316 my $srunargs = shift;
1317 my $execargs = shift;
1318 my $opts = shift || {};
1320 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1321 print STDERR (join (" ",
1322 map { / / ? "'$_'" : $_ }
1325 if $ENV{CRUNCH_DEBUG};
1327 if (defined $stdin) {
1328 my $child = open STDIN, "-|";
1329 defined $child or die "no fork: $!";
1331 print $stdin or die $!;
1332 close STDOUT or die $!;
1337 return system (@$args) if $opts->{fork};
1340 warn "ENV size is ".length(join(" ",%ENV));
1341 die "exec failed: $!: @$args";
1345 sub ban_node_by_slot {
1346 # Don't start any new jobsteps on this node for 60 seconds
1348 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1349 $slot[$slotid]->{node}->{hold_count}++;
1350 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1356 # checkout-and-build
1360 my $destdir = $ENV{"CRUNCH_SRC"};
1361 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1362 my $repo = $ENV{"CRUNCH_SRC_URL"};
1364 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1366 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1370 unlink "$destdir.commit";
1371 open STDOUT, ">", "$destdir.log";
1372 open STDERR, ">&STDOUT";
1375 my @git_archive_data = <DATA>;
1376 if (@git_archive_data) {
1377 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1378 print TARX @git_archive_data;
1380 die "'tar -C $destdir -xf -' exited $?: $!";
1385 chomp ($pwd = `pwd`);
1386 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1389 for my $src_path ("$destdir/arvados/sdk/python") {
1391 shell_or_die ("virtualenv", $install_dir);
1392 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1396 if (-e "$destdir/crunch_scripts/install") {
1397 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1398 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1400 shell_or_die ("./tests/autotests.sh", $install_dir);
1401 } elsif (-e "./install.sh") {
1402 shell_or_die ("./install.sh", $install_dir);
1406 unlink "$destdir.commit.new";
1407 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1408 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1417 if ($ENV{"DEBUG"}) {
1418 print STDERR "@_\n";
1421 or die "@_ failed: $! exit 0x".sprintf("%x",$?);