2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
84 $ENV{"TMPDIR"} ||= "/tmp";
85 unless (defined $ENV{"CRUNCH_TMP"}) {
86 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
87 if ($ENV{"USER"} ne "crunch" && $< != 0) {
88 # use a tmp dir unique for my uid
89 $ENV{"CRUNCH_TMP"} .= "-$<";
92 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
93 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
94 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
95 mkdir ($ENV{"JOB_WORK"});
103 GetOptions('force-unlock' => \$force_unlock,
104 'git-dir=s' => \$git_dir,
105 'job=s' => \$jobspec,
106 'job-api-token=s' => \$job_api_token,
107 'no-clear-tmp' => \$no_clear_tmp,
108 'resume-stash=s' => \$resume_stash,
111 if (defined $job_api_token) {
112 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
115 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
116 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
117 my $local_job = !$job_has_uuid;
122 $main::ENV{CRUNCH_DEBUG} = 1;
126 $main::ENV{CRUNCH_DEBUG} = 0;
131 my $arv = Arvados->new('apiVersion' => 'v1');
134 my $User = $arv->{'users'}->{'current'}->execute;
142 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
143 if (!$force_unlock) {
144 if ($Job->{'is_locked_by_uuid'}) {
145 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147 if ($Job->{'success'} ne undef) {
148 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150 if ($Job->{'running'}) {
151 croak("Job 'running' flag is already set");
153 if ($Job->{'started_at'}) {
154 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
160 $Job = JSON::decode_json($jobspec);
164 map { croak ("No $_ specified") unless $Job->{$_} }
165 qw(script script_version script_parameters);
168 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
169 $Job->{'started_at'} = gmtime;
171 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
175 $job_id = $Job->{'uuid'};
177 $metastream = Warehouse::Stream->new(whc => new Warehouse);
179 $metastream->name('.');
180 $metastream->write_start($job_id . '.log.txt');
183 $Job->{'runtime_constraints'} ||= {};
184 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
185 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
188 Log (undef, "check slurm allocation");
191 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
195 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
196 push @sinfo, "$localcpus localhost";
198 if (exists $ENV{SLURM_NODELIST})
200 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
204 my ($ncpus, $slurm_nodelist) = split;
205 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
208 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
211 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
214 foreach (split (",", $ranges))
227 push @nodelist, map {
229 $n =~ s/\[[-,\d]+\]/$_/;
236 push @nodelist, $nodelist;
239 foreach my $nodename (@nodelist)
241 Log (undef, "node $nodename - $ncpus slots");
242 my $node = { name => $nodename,
246 foreach my $cpu (1..$ncpus)
248 push @slot, { node => $node,
252 push @node, @nodelist;
257 # Ensure that we get one jobstep running on each allocated node before
258 # we start overloading nodes with concurrent steps
260 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267 # Claim this job, and make sure nobody else does
268 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
269 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
270 croak("Error while updating / locking job");
272 $Job->update_attributes('started_at' => scalar gmtime,
275 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
314 if (defined $Job->{thawedfromkey})
316 thaw ($Job->{thawedfromkey});
320 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321 'job_uuid' => $Job->{'uuid'},
326 push @jobstep, { 'level' => 0,
328 'arvados_task' => $first_task,
330 push @jobstep_todo, 0;
337 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
339 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
342 if (!defined $no_clear_tmp) {
343 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
344 system($clear_tmp_cmd) == 0
345 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
347 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
348 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
350 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
351 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
352 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
354 or croak ("setup.py in $src_path failed: exit ".($?>>8));
362 $build_script = <DATA>;
364 Log (undef, "Install revision ".$Job->{script_version});
365 my $nodelist = join(",", @node);
367 if (!defined $no_clear_tmp) {
368 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
370 my $cleanpid = fork();
373 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
374 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379 last if $cleanpid == waitpid (-1, WNOHANG);
380 freeze_if_want_freeze ($cleanpid);
381 select (undef, undef, undef, 0.1);
383 Log (undef, "Clean-work-dir exited $?");
386 # Install requested code version
389 my @srunargs = ("srun",
390 "--nodelist=$nodelist",
391 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
393 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
394 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
398 my $treeish = $Job->{'script_version'};
399 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
400 # Todo: let script_version specify repository instead of expecting
401 # parent process to figure it out.
402 $ENV{"CRUNCH_SRC_URL"} = $repo;
404 # Create/update our clone of the remote git repo
406 if (!-d $ENV{"CRUNCH_SRC"}) {
407 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
408 or croak ("git clone $repo failed: exit ".($?>>8));
409 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
411 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
413 # If this looks like a subversion r#, look for it in git-svn commit messages
415 if ($treeish =~ m{^\d{1,4}$}) {
416 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
418 if ($gitlog =~ /^[a-f0-9]{40}$/) {
420 Log (undef, "Using commit $commit for script_version $treeish");
424 # If that didn't work, try asking git to look it up as a tree-ish.
426 if (!defined $commit) {
428 my $cooked_treeish = $treeish;
429 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
430 # Looks like a git branch name -- make sure git knows it's
431 # relative to the remote repo
432 $cooked_treeish = "origin/$treeish";
435 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
437 if ($found =~ /^[0-9a-f]{40}$/s) {
439 if ($commit ne $treeish) {
440 # Make sure we record the real commit id in the database,
441 # frozentokey, logs, etc. -- instead of an abbreviation or a
442 # branch name which can become ambiguous or point to a
443 # different commit in the future.
444 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
445 Log (undef, "Using commit $commit for tree-ish $treeish");
446 if ($commit ne $treeish) {
447 $Job->{'script_version'} = $commit;
449 $Job->update_attributes('script_version' => $commit) or
450 croak("Error while updating job");
456 if (defined $commit) {
457 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458 @execargs = ("sh", "-c",
459 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
460 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
463 croak ("could not figure out commit id for $treeish");
466 my $installpid = fork();
467 if ($installpid == 0)
469 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
474 last if $installpid == waitpid (-1, WNOHANG);
475 freeze_if_want_freeze ($installpid);
476 select (undef, undef, undef, 0.1);
478 Log (undef, "Install exited $?");
483 foreach (qw (script script_version script_parameters runtime_constraints))
487 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
489 foreach (split (/\n/, $Job->{knobs}))
491 Log (undef, "knob " . $_);
496 $main::success = undef;
502 my $thisround_succeeded = 0;
503 my $thisround_failed = 0;
504 my $thisround_failed_multiple = 0;
506 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
507 or $a <=> $b } @jobstep_todo;
508 my $level = $jobstep[$jobstep_todo[0]]->{level};
509 Log (undef, "start level $level");
514 my @freeslot = (0..$#slot);
517 my $progress_is_dirty = 1;
518 my $progress_stats_updated = 0;
520 update_progress_stats();
525 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
527 my $id = $jobstep_todo[$todo_ptr];
528 my $Jobstep = $jobstep[$id];
529 if ($Jobstep->{level} != $level)
534 pipe $reader{$id}, "writer" or croak ($!);
535 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
536 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
538 my $childslot = $freeslot[0];
539 my $childnode = $slot[$childslot]->{node};
540 my $childslotname = join (".",
541 $slot[$childslot]->{node}->{name},
542 $slot[$childslot]->{cpu});
543 my $childpid = fork();
546 $SIG{'INT'} = 'DEFAULT';
547 $SIG{'QUIT'} = 'DEFAULT';
548 $SIG{'TERM'} = 'DEFAULT';
550 foreach (values (%reader))
554 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
555 open(STDOUT,">&writer");
556 open(STDERR,">&writer");
561 delete $ENV{"GNUPGHOME"};
562 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
563 $ENV{"TASK_QSEQUENCE"} = $id;
564 $ENV{"TASK_SEQUENCE"} = $level;
565 $ENV{"JOB_SCRIPT"} = $Job->{script};
566 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
567 $param =~ tr/a-z/A-Z/;
568 $ENV{"JOB_PARAMETER_$param"} = $value;
570 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
571 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
572 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
573 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
574 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
575 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
576 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
582 "--nodelist=".$childnode->{name},
583 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
584 "--job-name=$job_id.$id.$$",
586 my @execargs = qw(sh);
587 my $build_script_to_send = "";
589 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
590 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
591 ."&& cd $ENV{CRUNCH_TMP} ";
594 $build_script_to_send = $build_script;
599 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
600 my @execargs = ('bash', '-c', $command);
601 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
605 if (!defined $childpid)
612 $proc{$childpid} = { jobstep => $id,
615 jobstepname => "$job_id.$id.$childpid",
617 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
618 $slot[$childslot]->{pid} = $childpid;
620 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
621 Log ($id, "child $childpid started on $childslotname");
622 $Jobstep->{starttime} = time;
623 $Jobstep->{node} = $childnode->{name};
624 $Jobstep->{slotindex} = $childslot;
625 delete $Jobstep->{stderr};
626 delete $Jobstep->{finishtime};
628 splice @jobstep_todo, $todo_ptr, 1;
631 $progress_is_dirty = 1;
635 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
637 last THISROUND if $main::please_freeze;
638 if ($main::please_info)
640 $main::please_info = 0;
644 update_progress_stats();
651 check_refresh_wanted();
653 update_progress_stats();
654 select (undef, undef, undef, 0.1);
656 elsif (time - $progress_stats_updated >= 30)
658 update_progress_stats();
660 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
661 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
663 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
664 .($thisround_failed+$thisround_succeeded)
665 .") -- giving up on this round";
666 Log (undef, $message);
670 # move slots from freeslot to holdslot (or back to freeslot) if necessary
671 for (my $i=$#freeslot; $i>=0; $i--) {
672 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
673 push @holdslot, (splice @freeslot, $i, 1);
676 for (my $i=$#holdslot; $i>=0; $i--) {
677 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
678 push @freeslot, (splice @holdslot, $i, 1);
682 # give up if no nodes are succeeding
683 if (!grep { $_->{node}->{losing_streak} == 0 &&
684 $_->{node}->{hold_count} < 4 } @slot) {
685 my $message = "Every node has failed -- giving up on this round";
686 Log (undef, $message);
693 push @freeslot, splice @holdslot;
694 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
697 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
700 if ($main::please_continue) {
701 $main::please_continue = 0;
704 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
708 check_refresh_wanted();
710 update_progress_stats();
711 select (undef, undef, undef, 0.1);
712 killem (keys %proc) if $main::please_freeze;
716 update_progress_stats();
717 freeze_if_want_freeze();
720 if (!defined $main::success)
723 $thisround_succeeded == 0 &&
724 ($thisround_failed == 0 || $thisround_failed > 4))
726 my $message = "stop because $thisround_failed tasks failed and none succeeded";
727 Log (undef, $message);
736 goto ONELEVEL if !defined $main::success;
739 release_allocation();
742 $Job->update_attributes('output' => &collate_output(),
744 'success' => $Job->{'output'} && $main::success,
745 'finished_at' => scalar gmtime)
748 if ($Job->{'output'})
751 my $manifest_text = capturex("whget", $Job->{'output'});
752 $arv->{'collections'}->{'create'}->execute('collection' => {
753 'uuid' => $Job->{'output'},
754 'manifest_text' => $manifest_text,
758 Log (undef, "Failed to register output manifest: $@");
762 Log (undef, "finish");
769 sub update_progress_stats
771 $progress_stats_updated = time;
772 return if !$progress_is_dirty;
773 my ($todo, $done, $running) = (scalar @jobstep_todo,
774 scalar @jobstep_done,
775 scalar @slot - scalar @freeslot - scalar @holdslot);
776 $Job->{'tasks_summary'} ||= {};
777 $Job->{'tasks_summary'}->{'todo'} = $todo;
778 $Job->{'tasks_summary'}->{'done'} = $done;
779 $Job->{'tasks_summary'}->{'running'} = $running;
781 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
783 Log (undef, "status: $done done, $running running, $todo todo");
784 $progress_is_dirty = 0;
791 my $pid = waitpid (-1, WNOHANG);
792 return 0 if $pid <= 0;
794 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
796 . $slot[$proc{$pid}->{slot}]->{cpu});
797 my $jobstepid = $proc{$pid}->{jobstep};
798 my $elapsed = time - $proc{$pid}->{time};
799 my $Jobstep = $jobstep[$jobstepid];
801 my $childstatus = $?;
802 my $exitvalue = $childstatus >> 8;
803 my $exitinfo = sprintf("exit %d signal %d%s",
806 ($childstatus & 128 ? ' core dump' : ''));
807 $Jobstep->{'arvados_task'}->reload;
808 my $task_success = $Jobstep->{'arvados_task'}->{success};
810 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
812 if (!defined $task_success) {
813 # task did not indicate one way or the other --> fail
814 $Jobstep->{'arvados_task'}->{success} = 0;
815 $Jobstep->{'arvados_task'}->save;
822 $temporary_fail ||= $Jobstep->{node_fail};
823 $temporary_fail ||= ($exitvalue == 111);
826 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
828 # Check for signs of a failed or misconfigured node
829 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
830 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
831 # Don't count this against jobstep failure thresholds if this
832 # node is already suspected faulty and srun exited quickly
833 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
835 Log ($jobstepid, "blaming failure on suspect node " .
836 $slot[$proc{$pid}->{slot}]->{node}->{name});
837 $temporary_fail ||= 1;
839 ban_node_by_slot($proc{$pid}->{slot});
842 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
843 ++$Jobstep->{'failures'},
844 $temporary_fail ? 'temporary ' : 'permanent',
847 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
848 # Give up on this task, and the whole job
850 $main::please_freeze = 1;
853 # Put this task back on the todo queue
854 push @jobstep_todo, $jobstepid;
856 $Job->{'tasks_summary'}->{'failed'}++;
860 ++$thisround_succeeded;
861 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
862 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
863 push @jobstep_done, $jobstepid;
864 Log ($jobstepid, "success in $elapsed seconds");
866 $Jobstep->{exitcode} = $childstatus;
867 $Jobstep->{finishtime} = time;
868 process_stderr ($jobstepid, $task_success);
869 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
871 close $reader{$jobstepid};
872 delete $reader{$jobstepid};
873 delete $slot[$proc{$pid}->{slot}]->{pid};
874 push @freeslot, $proc{$pid}->{slot};
878 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
880 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
882 'order' => 'qsequence'
884 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
886 'level' => $arvados_task->{'sequence'},
888 'arvados_task' => $arvados_task
890 push @jobstep, $jobstep;
891 push @jobstep_todo, $#jobstep;
894 $progress_is_dirty = 1;
898 sub check_refresh_wanted
900 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
901 if (@stat && $stat[9] > $latest_refresh) {
902 $latest_refresh = scalar time;
904 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
905 for my $attr ('cancelled_at',
906 'cancelled_by_user_uuid',
907 'cancelled_by_client_uuid') {
908 $Job->{$attr} = $Job2->{$attr};
910 if ($Job->{'cancelled_at'}) {
911 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
912 " by user " . $Job->{cancelled_by_user_uuid});
914 $main::please_freeze = 1;
922 # return if the kill list was checked <4 seconds ago
923 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
927 $squeue_kill_checked = time;
929 # use killem() on procs whose killtime is reached
932 if (exists $proc{$_}->{killtime}
933 && $proc{$_}->{killtime} <= time)
939 # return if the squeue was checked <60 seconds ago
940 if (defined $squeue_checked && $squeue_checked > time - 60)
944 $squeue_checked = time;
948 # here is an opportunity to check for mysterious problems with local procs
952 # get a list of steps still running
953 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
955 if ($squeue[-1] ne "ok")
961 # which of my jobsteps are running, according to squeue?
965 if (/^(\d+)\.(\d+) (\S+)/)
967 if ($1 eq $ENV{SLURM_JOBID})
974 # which of my active child procs (>60s old) were not mentioned by squeue?
977 if ($proc{$_}->{time} < time - 60
978 && !exists $ok{$proc{$_}->{jobstepname}}
979 && !exists $proc{$_}->{killtime})
981 # kill this proc if it hasn't exited in 30 seconds
982 $proc{$_}->{killtime} = time + 30;
988 sub release_allocation
992 Log (undef, "release job allocation");
993 system "scancel $ENV{SLURM_JOBID}";
1001 foreach my $job (keys %reader)
1004 while (0 < sysread ($reader{$job}, $buf, 8192))
1006 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1007 $jobstep[$job]->{stderr} .= $buf;
1008 preprocess_stderr ($job);
1009 if (length ($jobstep[$job]->{stderr}) > 16384)
1011 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1020 sub preprocess_stderr
1024 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1026 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1027 Log ($job, "stderr $line");
1028 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1030 $main::please_freeze = 1;
1032 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1033 $jobstep[$job]->{node_fail} = 1;
1034 ban_node_by_slot($jobstep[$job]->{slotindex});
1043 my $task_success = shift;
1044 preprocess_stderr ($job);
1047 Log ($job, "stderr $_");
1048 } split ("\n", $jobstep[$job]->{stderr});
1054 my $whc = Warehouse->new;
1055 Log (undef, "collate");
1056 $whc->write_start (1);
1060 next if (!exists $_->{'arvados_task'}->{output} ||
1061 !$_->{'arvados_task'}->{'success'} ||
1062 $_->{'exitcode'} != 0);
1063 my $output = $_->{'arvados_task'}->{output};
1064 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1066 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1067 $whc->write_data ($output);
1069 elsif (@jobstep == 1)
1071 $joboutput = $output;
1074 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1076 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1077 $whc->write_data ($outblock);
1081 my $errstr = $whc->errstr;
1082 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1086 $joboutput = $whc->write_finish if !defined $joboutput;
1089 Log (undef, "output $joboutput");
1090 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1094 Log (undef, "output undef");
1104 my $sig = 2; # SIGINT first
1105 if (exists $proc{$_}->{"sent_$sig"} &&
1106 time - $proc{$_}->{"sent_$sig"} > 4)
1108 $sig = 15; # SIGTERM if SIGINT doesn't work
1110 if (exists $proc{$_}->{"sent_$sig"} &&
1111 time - $proc{$_}->{"sent_$sig"} > 4)
1113 $sig = 9; # SIGKILL if SIGTERM doesn't work
1115 if (!exists $proc{$_}->{"sent_$sig"})
1117 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1119 select (undef, undef, undef, 0.1);
1122 kill $sig, $_; # srun wants two SIGINT to really interrupt
1124 $proc{$_}->{"sent_$sig"} = time;
1125 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1135 vec($bits,fileno($_),1) = 1;
1141 sub Log # ($jobstep_id, $logmessage)
1143 if ($_[1] =~ /\n/) {
1144 for my $line (split (/\n/, $_[1])) {
1149 my $fh = select STDERR; $|=1; select $fh;
1150 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1151 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1154 if ($metastream || -t STDERR) {
1155 my @gmtime = gmtime;
1156 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1157 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1159 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1161 return if !$metastream;
1162 $metastream->write_data ($datetime . " " . $message);
1168 my ($package, $file, $line) = caller;
1169 my $message = "@_ at $file line $line\n";
1170 Log (undef, $message);
1171 freeze() if @jobstep_todo;
1172 collate_output() if @jobstep_todo;
1174 save_meta() if $metastream;
1181 return if !$job_has_uuid;
1182 $Job->update_attributes('running' => 0,
1184 'finished_at' => scalar gmtime);
1190 my $justcheckpoint = shift; # false if this will be the last meta saved
1191 my $m = $metastream;
1192 $m = $m->copy if $justcheckpoint;
1194 my $whc = Warehouse->new;
1195 my $loglocator = $whc->store_block ($m->as_string);
1196 $arv->{'collections'}->{'create'}->execute('collection' => {
1197 'uuid' => $loglocator,
1198 'manifest_text' => $m->as_string,
1200 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1201 Log (undef, "log manifest is $loglocator");
1202 $Job->{'log'} = $loglocator;
1203 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1207 sub freeze_if_want_freeze
1209 if ($main::please_freeze)
1211 release_allocation();
1214 # kill some srun procs before freeze+stop
1215 map { $proc{$_} = {} } @_;
1218 killem (keys %proc);
1219 select (undef, undef, undef, 0.1);
1221 while (($died = waitpid (-1, WNOHANG)) > 0)
1223 delete $proc{$died};
1238 Log (undef, "Freeze not implemented");
1245 croak ("Thaw not implemented");
1249 Log (undef, "thaw from $key");
1254 @jobstep_tomerge = ();
1255 $jobstep_tomerge_level = 0;
1258 my $stream = new Warehouse::Stream ( whc => $whc,
1259 hash => [split (",", $key)] );
1261 while (my $dataref = $stream->read_until (undef, "\n\n"))
1263 if ($$dataref =~ /^job /)
1265 foreach (split ("\n", $$dataref))
1267 my ($k, $v) = split ("=", $_, 2);
1268 $frozenjob->{$k} = freezeunquote ($v);
1273 if ($$dataref =~ /^merge (\d+) (.*)/)
1275 $jobstep_tomerge_level = $1;
1277 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1282 foreach (split ("\n", $$dataref))
1284 my ($k, $v) = split ("=", $_, 2);
1285 $Jobstep->{$k} = freezeunquote ($v) if $k;
1287 $Jobstep->{'failures'} = 0;
1288 push @jobstep, $Jobstep;
1290 if ($Jobstep->{exitcode} eq "0")
1292 push @jobstep_done, $#jobstep;
1296 push @jobstep_todo, $#jobstep;
1300 foreach (qw (script script_version script_parameters))
1302 $Job->{$_} = $frozenjob->{$_};
1304 $Job->save if $job_has_uuid;
1320 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1327 my $srunargs = shift;
1328 my $execargs = shift;
1329 my $opts = shift || {};
1331 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1332 print STDERR (join (" ",
1333 map { / / ? "'$_'" : $_ }
1336 if $ENV{CRUNCH_DEBUG};
1338 if (defined $stdin) {
1339 my $child = open STDIN, "-|";
1340 defined $child or die "no fork: $!";
1342 print $stdin or die $!;
1343 close STDOUT or die $!;
1348 return system (@$args) if $opts->{fork};
1351 warn "ENV size is ".length(join(" ",%ENV));
1352 die "exec failed: $!: @$args";
1356 sub ban_node_by_slot {
1357 # Don't start any new jobsteps on this node for 60 seconds
1359 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1360 $slot[$slotid]->{node}->{hold_count}++;
1361 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1367 # checkout-and-build
1371 my $destdir = $ENV{"CRUNCH_SRC"};
1372 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1373 my $repo = $ENV{"CRUNCH_SRC_URL"};
1375 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1377 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1381 unlink "$destdir.commit";
1382 open STDOUT, ">", "$destdir.log";
1383 open STDERR, ">&STDOUT";
1386 my @git_archive_data = <DATA>;
1387 if (@git_archive_data) {
1388 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1389 print TARX @git_archive_data;
1391 die "'tar -C $destdir -xf -' exited $?: $!";
1396 chomp ($pwd = `pwd`);
1397 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1400 for my $src_path ("$destdir/arvados/sdk/python") {
1402 shell_or_die ("virtualenv", $install_dir);
1403 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1407 if (-e "$destdir/crunch_scripts/install") {
1408 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1409 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1411 shell_or_die ("./tests/autotests.sh", $install_dir);
1412 } elsif (-e "./install.sh") {
1413 shell_or_die ("./install.sh", $install_dir);
1417 unlink "$destdir.commit.new";
1418 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1419 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1428 if ($ENV{"DEBUG"}) {
1429 print STDERR "@_\n";
1432 or die "@_ failed: $! exit 0x".sprintf("%x",$?);