2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
104 GetOptions('force-unlock' => \$force_unlock,
105 'git-dir=s' => \$git_dir,
106 'job=s' => \$jobspec,
107 'job-api-token=s' => \$job_api_token,
108 'no-clear-tmp' => \$no_clear_tmp,
109 'resume-stash=s' => \$resume_stash,
112 if (defined $job_api_token) {
113 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
123 $main::ENV{CRUNCH_DEBUG} = 1;
127 $main::ENV{CRUNCH_DEBUG} = 0;
132 my $arv = Arvados->new('apiVersion' => 'v1');
135 my $User = $arv->{'users'}->{'current'}->execute;
143 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144 if (!$force_unlock) {
145 if ($Job->{'is_locked_by_uuid'}) {
146 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148 if ($Job->{'success'} ne undef) {
149 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151 if ($Job->{'running'}) {
152 croak("Job 'running' flag is already set");
154 if ($Job->{'started_at'}) {
155 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
161 $Job = JSON::decode_json($jobspec);
165 map { croak ("No $_ specified") unless $Job->{$_} }
166 qw(script script_version script_parameters);
169 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170 $Job->{'started_at'} = gmtime;
172 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
176 $job_id = $Job->{'uuid'};
178 $metastream = Warehouse::Stream->new(whc => new Warehouse);
180 $metastream->name('.');
181 $metastream->write_start($job_id . '.log.txt');
184 $Job->{'runtime_constraints'} ||= {};
185 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
186 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
189 Log (undef, "check slurm allocation");
192 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
196 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
197 push @sinfo, "$localcpus localhost";
199 if (exists $ENV{SLURM_NODELIST})
201 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
205 my ($ncpus, $slurm_nodelist) = split;
206 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
209 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
212 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
215 foreach (split (",", $ranges))
228 push @nodelist, map {
230 $n =~ s/\[[-,\d]+\]/$_/;
237 push @nodelist, $nodelist;
240 foreach my $nodename (@nodelist)
242 Log (undef, "node $nodename - $ncpus slots");
243 my $node = { name => $nodename,
247 foreach my $cpu (1..$ncpus)
249 push @slot, { node => $node,
253 push @node, @nodelist;
258 # Ensure that we get one jobstep running on each allocated node before
259 # we start overloading nodes with concurrent steps
261 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268 # Claim this job, and make sure nobody else does
269 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
270 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271 croak("Error while updating / locking job");
273 $Job->update_attributes('started_at' => scalar gmtime,
276 'tasks_summary' => { 'failed' => 0,
283 Log (undef, "start");
284 $SIG{'INT'} = sub { $main::please_freeze = 1; };
285 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
286 $SIG{'TERM'} = \&croak;
287 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
288 $SIG{'ALRM'} = sub { $main::please_info = 1; };
289 $SIG{'CONT'} = sub { $main::please_continue = 1; };
290 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
292 $main::please_freeze = 0;
293 $main::please_info = 0;
294 $main::please_continue = 0;
295 $main::please_refresh = 0;
296 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
298 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
299 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
300 $ENV{"JOB_UUID"} = $job_id;
304 my @jobstep_todo = ();
305 my @jobstep_done = ();
306 my @jobstep_tomerge = ();
307 my $jobstep_tomerge_level = 0;
309 my $squeue_kill_checked;
310 my $output_in_keep = 0;
311 my $latest_refresh = scalar time;
315 if (defined $Job->{thawedfromkey})
317 thaw ($Job->{thawedfromkey});
321 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
322 'job_uuid' => $Job->{'uuid'},
327 push @jobstep, { 'level' => 0,
329 'arvados_task' => $first_task,
331 push @jobstep_todo, 0;
337 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
344 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
346 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
349 if (!defined $no_clear_tmp) {
350 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
351 system($clear_tmp_cmd) == 0
352 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
354 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
355 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
357 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
358 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
359 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
361 or croak ("setup.py in $src_path failed: exit ".($?>>8));
369 $build_script = <DATA>;
371 Log (undef, "Install revision ".$Job->{script_version});
372 my $nodelist = join(",", @node);
374 if (!defined $no_clear_tmp) {
375 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
377 my $cleanpid = fork();
380 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
381 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
386 last if $cleanpid == waitpid (-1, WNOHANG);
387 freeze_if_want_freeze ($cleanpid);
388 select (undef, undef, undef, 0.1);
390 Log (undef, "Clean-work-dir exited $?");
393 # Install requested code version
396 my @srunargs = ("srun",
397 "--nodelist=$nodelist",
398 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
400 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
401 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
405 my $treeish = $Job->{'script_version'};
406 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
407 # Todo: let script_version specify repository instead of expecting
408 # parent process to figure it out.
409 $ENV{"CRUNCH_SRC_URL"} = $repo;
411 # Create/update our clone of the remote git repo
413 if (!-d $ENV{"CRUNCH_SRC"}) {
414 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
415 or croak ("git clone $repo failed: exit ".($?>>8));
416 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
418 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
420 # If this looks like a subversion r#, look for it in git-svn commit messages
422 if ($treeish =~ m{^\d{1,4}$}) {
423 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
425 if ($gitlog =~ /^[a-f0-9]{40}$/) {
427 Log (undef, "Using commit $commit for script_version $treeish");
431 # If that didn't work, try asking git to look it up as a tree-ish.
433 if (!defined $commit) {
435 my $cooked_treeish = $treeish;
436 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
437 # Looks like a git branch name -- make sure git knows it's
438 # relative to the remote repo
439 $cooked_treeish = "origin/$treeish";
442 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
444 if ($found =~ /^[0-9a-f]{40}$/s) {
446 if ($commit ne $treeish) {
447 # Make sure we record the real commit id in the database,
448 # frozentokey, logs, etc. -- instead of an abbreviation or a
449 # branch name which can become ambiguous or point to a
450 # different commit in the future.
451 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
452 Log (undef, "Using commit $commit for tree-ish $treeish");
453 if ($commit ne $treeish) {
454 $Job->{'script_version'} = $commit;
456 $Job->update_attributes('script_version' => $commit) or
457 croak("Error while updating job");
463 if (defined $commit) {
464 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
465 @execargs = ("sh", "-c",
466 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
467 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
470 croak ("could not figure out commit id for $treeish");
473 my $installpid = fork();
474 if ($installpid == 0)
476 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
481 last if $installpid == waitpid (-1, WNOHANG);
482 freeze_if_want_freeze ($installpid);
483 select (undef, undef, undef, 0.1);
485 Log (undef, "Install exited $?");
490 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
491 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
496 foreach (qw (script script_version script_parameters runtime_constraints))
500 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
502 foreach (split (/\n/, $Job->{knobs}))
504 Log (undef, "knob " . $_);
509 $main::success = undef;
515 my $thisround_succeeded = 0;
516 my $thisround_failed = 0;
517 my $thisround_failed_multiple = 0;
519 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
520 or $a <=> $b } @jobstep_todo;
521 my $level = $jobstep[$jobstep_todo[0]]->{level};
522 Log (undef, "start level $level");
527 my @freeslot = (0..$#slot);
530 my $progress_is_dirty = 1;
531 my $progress_stats_updated = 0;
533 update_progress_stats();
538 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
540 my $id = $jobstep_todo[$todo_ptr];
541 my $Jobstep = $jobstep[$id];
542 if ($Jobstep->{level} != $level)
547 pipe $reader{$id}, "writer" or croak ($!);
548 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
549 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
551 my $childslot = $freeslot[0];
552 my $childnode = $slot[$childslot]->{node};
553 my $childslotname = join (".",
554 $slot[$childslot]->{node}->{name},
555 $slot[$childslot]->{cpu});
556 my $childpid = fork();
559 $SIG{'INT'} = 'DEFAULT';
560 $SIG{'QUIT'} = 'DEFAULT';
561 $SIG{'TERM'} = 'DEFAULT';
563 foreach (values (%reader))
567 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
568 open(STDOUT,">&writer");
569 open(STDERR,">&writer");
574 delete $ENV{"GNUPGHOME"};
575 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
576 $ENV{"TASK_QSEQUENCE"} = $id;
577 $ENV{"TASK_SEQUENCE"} = $level;
578 $ENV{"JOB_SCRIPT"} = $Job->{script};
579 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
580 $param =~ tr/a-z/A-Z/;
581 $ENV{"JOB_PARAMETER_$param"} = $value;
583 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
584 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
585 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
586 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
587 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
588 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
589 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
595 "--nodelist=".$childnode->{name},
596 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
597 "--job-name=$job_id.$id.$$",
599 my @execargs = qw(sh);
600 my $build_script_to_send = "";
602 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
603 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
604 ."&& cd $ENV{CRUNCH_TMP} ";
607 $build_script_to_send = $build_script;
612 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
613 my @execargs = ('bash', '-c', $command);
614 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
618 if (!defined $childpid)
625 $proc{$childpid} = { jobstep => $id,
628 jobstepname => "$job_id.$id.$childpid",
630 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
631 $slot[$childslot]->{pid} = $childpid;
633 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
634 Log ($id, "child $childpid started on $childslotname");
635 $Jobstep->{starttime} = time;
636 $Jobstep->{node} = $childnode->{name};
637 $Jobstep->{slotindex} = $childslot;
638 delete $Jobstep->{stderr};
639 delete $Jobstep->{finishtime};
641 splice @jobstep_todo, $todo_ptr, 1;
644 $progress_is_dirty = 1;
648 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
650 last THISROUND if $main::please_freeze;
651 if ($main::please_info)
653 $main::please_info = 0;
657 update_progress_stats();
664 check_refresh_wanted();
666 update_progress_stats();
667 select (undef, undef, undef, 0.1);
669 elsif (time - $progress_stats_updated >= 30)
671 update_progress_stats();
673 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
674 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
676 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
677 .($thisround_failed+$thisround_succeeded)
678 .") -- giving up on this round";
679 Log (undef, $message);
683 # move slots from freeslot to holdslot (or back to freeslot) if necessary
684 for (my $i=$#freeslot; $i>=0; $i--) {
685 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
686 push @holdslot, (splice @freeslot, $i, 1);
689 for (my $i=$#holdslot; $i>=0; $i--) {
690 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
691 push @freeslot, (splice @holdslot, $i, 1);
695 # give up if no nodes are succeeding
696 if (!grep { $_->{node}->{losing_streak} == 0 &&
697 $_->{node}->{hold_count} < 4 } @slot) {
698 my $message = "Every node has failed -- giving up on this round";
699 Log (undef, $message);
706 push @freeslot, splice @holdslot;
707 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
710 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
713 if ($main::please_continue) {
714 $main::please_continue = 0;
717 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
721 check_refresh_wanted();
723 update_progress_stats();
724 select (undef, undef, undef, 0.1);
725 killem (keys %proc) if $main::please_freeze;
729 update_progress_stats();
730 freeze_if_want_freeze();
733 if (!defined $main::success)
736 $thisround_succeeded == 0 &&
737 ($thisround_failed == 0 || $thisround_failed > 4))
739 my $message = "stop because $thisround_failed tasks failed and none succeeded";
740 Log (undef, $message);
749 goto ONELEVEL if !defined $main::success;
752 release_allocation();
755 $Job->update_attributes('output' => &collate_output(),
757 'success' => $Job->{'output'} && $main::success,
758 'finished_at' => scalar gmtime)
761 if ($Job->{'output'})
764 my $manifest_text = capturex("whget", $Job->{'output'});
765 $arv->{'collections'}->{'create'}->execute('collection' => {
766 'uuid' => $Job->{'output'},
767 'manifest_text' => $manifest_text,
771 Log (undef, "Failed to register output manifest: $@");
775 Log (undef, "finish");
782 sub update_progress_stats
784 $progress_stats_updated = time;
785 return if !$progress_is_dirty;
786 my ($todo, $done, $running) = (scalar @jobstep_todo,
787 scalar @jobstep_done,
788 scalar @slot - scalar @freeslot - scalar @holdslot);
789 $Job->{'tasks_summary'} ||= {};
790 $Job->{'tasks_summary'}->{'todo'} = $todo;
791 $Job->{'tasks_summary'}->{'done'} = $done;
792 $Job->{'tasks_summary'}->{'running'} = $running;
794 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
796 Log (undef, "status: $done done, $running running, $todo todo");
797 $progress_is_dirty = 0;
804 my $pid = waitpid (-1, WNOHANG);
805 return 0 if $pid <= 0;
807 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
809 . $slot[$proc{$pid}->{slot}]->{cpu});
810 my $jobstepid = $proc{$pid}->{jobstep};
811 my $elapsed = time - $proc{$pid}->{time};
812 my $Jobstep = $jobstep[$jobstepid];
814 my $childstatus = $?;
815 my $exitvalue = $childstatus >> 8;
816 my $exitinfo = sprintf("exit %d signal %d%s",
819 ($childstatus & 128 ? ' core dump' : ''));
820 $Jobstep->{'arvados_task'}->reload;
821 my $task_success = $Jobstep->{'arvados_task'}->{success};
823 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
825 if (!defined $task_success) {
826 # task did not indicate one way or the other --> fail
827 $Jobstep->{'arvados_task'}->{success} = 0;
828 $Jobstep->{'arvados_task'}->save;
835 $temporary_fail ||= $Jobstep->{node_fail};
836 $temporary_fail ||= ($exitvalue == 111);
839 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
841 # Check for signs of a failed or misconfigured node
842 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
843 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
844 # Don't count this against jobstep failure thresholds if this
845 # node is already suspected faulty and srun exited quickly
846 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
848 Log ($jobstepid, "blaming failure on suspect node " .
849 $slot[$proc{$pid}->{slot}]->{node}->{name});
850 $temporary_fail ||= 1;
852 ban_node_by_slot($proc{$pid}->{slot});
855 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
856 ++$Jobstep->{'failures'},
857 $temporary_fail ? 'temporary ' : 'permanent',
860 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
861 # Give up on this task, and the whole job
863 $main::please_freeze = 1;
866 # Put this task back on the todo queue
867 push @jobstep_todo, $jobstepid;
869 $Job->{'tasks_summary'}->{'failed'}++;
873 ++$thisround_succeeded;
874 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
875 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
876 push @jobstep_done, $jobstepid;
877 Log ($jobstepid, "success in $elapsed seconds");
879 $Jobstep->{exitcode} = $childstatus;
880 $Jobstep->{finishtime} = time;
881 process_stderr ($jobstepid, $task_success);
882 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
884 close $reader{$jobstepid};
885 delete $reader{$jobstepid};
886 delete $slot[$proc{$pid}->{slot}]->{pid};
887 push @freeslot, $proc{$pid}->{slot};
891 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
893 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
895 'order' => 'qsequence'
897 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
899 'level' => $arvados_task->{'sequence'},
901 'arvados_task' => $arvados_task
903 push @jobstep, $jobstep;
904 push @jobstep_todo, $#jobstep;
907 $progress_is_dirty = 1;
911 sub check_refresh_wanted
913 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
914 if (@stat && $stat[9] > $latest_refresh) {
915 $latest_refresh = scalar time;
917 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
918 for my $attr ('cancelled_at',
919 'cancelled_by_user_uuid',
920 'cancelled_by_client_uuid') {
921 $Job->{$attr} = $Job2->{$attr};
923 if ($Job->{'cancelled_at'}) {
924 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
925 " by user " . $Job->{cancelled_by_user_uuid});
927 $main::please_freeze = 1;
935 # return if the kill list was checked <4 seconds ago
936 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
940 $squeue_kill_checked = time;
942 # use killem() on procs whose killtime is reached
945 if (exists $proc{$_}->{killtime}
946 && $proc{$_}->{killtime} <= time)
952 # return if the squeue was checked <60 seconds ago
953 if (defined $squeue_checked && $squeue_checked > time - 60)
957 $squeue_checked = time;
961 # here is an opportunity to check for mysterious problems with local procs
965 # get a list of steps still running
966 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
968 if ($squeue[-1] ne "ok")
974 # which of my jobsteps are running, according to squeue?
978 if (/^(\d+)\.(\d+) (\S+)/)
980 if ($1 eq $ENV{SLURM_JOBID})
987 # which of my active child procs (>60s old) were not mentioned by squeue?
990 if ($proc{$_}->{time} < time - 60
991 && !exists $ok{$proc{$_}->{jobstepname}}
992 && !exists $proc{$_}->{killtime})
994 # kill this proc if it hasn't exited in 30 seconds
995 $proc{$_}->{killtime} = time + 30;
1001 sub release_allocation
1005 Log (undef, "release job allocation");
1006 system "scancel $ENV{SLURM_JOBID}";
1014 foreach my $job (keys %reader)
1017 while (0 < sysread ($reader{$job}, $buf, 8192))
1019 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1020 $jobstep[$job]->{stderr} .= $buf;
1021 preprocess_stderr ($job);
1022 if (length ($jobstep[$job]->{stderr}) > 16384)
1024 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1033 sub preprocess_stderr
1037 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1039 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1040 Log ($job, "stderr $line");
1041 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1043 $main::please_freeze = 1;
1045 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1046 $jobstep[$job]->{node_fail} = 1;
1047 ban_node_by_slot($jobstep[$job]->{slotindex});
1056 my $task_success = shift;
1057 preprocess_stderr ($job);
1060 Log ($job, "stderr $_");
1061 } split ("\n", $jobstep[$job]->{stderr});
1067 my $whc = Warehouse->new;
1068 Log (undef, "collate");
1069 $whc->write_start (1);
1073 next if (!exists $_->{'arvados_task'}->{output} ||
1074 !$_->{'arvados_task'}->{'success'} ||
1075 $_->{'exitcode'} != 0);
1076 my $output = $_->{'arvados_task'}->{output};
1077 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1079 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1080 $whc->write_data ($output);
1082 elsif (@jobstep == 1)
1084 $joboutput = $output;
1087 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1089 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1090 $whc->write_data ($outblock);
1094 my $errstr = $whc->errstr;
1095 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1099 $joboutput = $whc->write_finish if !defined $joboutput;
1102 Log (undef, "output $joboutput");
1103 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1107 Log (undef, "output undef");
1117 my $sig = 2; # SIGINT first
1118 if (exists $proc{$_}->{"sent_$sig"} &&
1119 time - $proc{$_}->{"sent_$sig"} > 4)
1121 $sig = 15; # SIGTERM if SIGINT doesn't work
1123 if (exists $proc{$_}->{"sent_$sig"} &&
1124 time - $proc{$_}->{"sent_$sig"} > 4)
1126 $sig = 9; # SIGKILL if SIGTERM doesn't work
1128 if (!exists $proc{$_}->{"sent_$sig"})
1130 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1132 select (undef, undef, undef, 0.1);
1135 kill $sig, $_; # srun wants two SIGINT to really interrupt
1137 $proc{$_}->{"sent_$sig"} = time;
1138 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1148 vec($bits,fileno($_),1) = 1;
1154 sub Log # ($jobstep_id, $logmessage)
1156 if ($_[1] =~ /\n/) {
1157 for my $line (split (/\n/, $_[1])) {
1162 my $fh = select STDERR; $|=1; select $fh;
1163 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1164 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1167 if ($metastream || -t STDERR) {
1168 my @gmtime = gmtime;
1169 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1170 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1172 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1174 return if !$metastream;
1175 $metastream->write_data ($datetime . " " . $message);
1181 my ($package, $file, $line) = caller;
1182 my $message = "@_ at $file line $line\n";
1183 Log (undef, $message);
1184 freeze() if @jobstep_todo;
1185 collate_output() if @jobstep_todo;
1187 save_meta() if $metastream;
1194 return if !$job_has_uuid;
1195 $Job->update_attributes('running' => 0,
1197 'finished_at' => scalar gmtime);
1203 my $justcheckpoint = shift; # false if this will be the last meta saved
1204 my $m = $metastream;
1205 $m = $m->copy if $justcheckpoint;
1207 my $whc = Warehouse->new;
1208 my $loglocator = $whc->store_block ($m->as_string);
1209 $arv->{'collections'}->{'create'}->execute('collection' => {
1210 'uuid' => $loglocator,
1211 'manifest_text' => $m->as_string,
1213 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1214 Log (undef, "log manifest is $loglocator");
1215 $Job->{'log'} = $loglocator;
1216 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1220 sub freeze_if_want_freeze
1222 if ($main::please_freeze)
1224 release_allocation();
1227 # kill some srun procs before freeze+stop
1228 map { $proc{$_} = {} } @_;
1231 killem (keys %proc);
1232 select (undef, undef, undef, 0.1);
1234 while (($died = waitpid (-1, WNOHANG)) > 0)
1236 delete $proc{$died};
1251 Log (undef, "Freeze not implemented");
1258 croak ("Thaw not implemented");
1262 Log (undef, "thaw from $key");
1267 @jobstep_tomerge = ();
1268 $jobstep_tomerge_level = 0;
1271 my $stream = new Warehouse::Stream ( whc => $whc,
1272 hash => [split (",", $key)] );
1274 while (my $dataref = $stream->read_until (undef, "\n\n"))
1276 if ($$dataref =~ /^job /)
1278 foreach (split ("\n", $$dataref))
1280 my ($k, $v) = split ("=", $_, 2);
1281 $frozenjob->{$k} = freezeunquote ($v);
1286 if ($$dataref =~ /^merge (\d+) (.*)/)
1288 $jobstep_tomerge_level = $1;
1290 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1295 foreach (split ("\n", $$dataref))
1297 my ($k, $v) = split ("=", $_, 2);
1298 $Jobstep->{$k} = freezeunquote ($v) if $k;
1300 $Jobstep->{'failures'} = 0;
1301 push @jobstep, $Jobstep;
1303 if ($Jobstep->{exitcode} eq "0")
1305 push @jobstep_done, $#jobstep;
1309 push @jobstep_todo, $#jobstep;
1313 foreach (qw (script script_version script_parameters))
1315 $Job->{$_} = $frozenjob->{$_};
1317 $Job->save if $job_has_uuid;
1333 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1340 my $srunargs = shift;
1341 my $execargs = shift;
1342 my $opts = shift || {};
1344 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1345 print STDERR (join (" ",
1346 map { / / ? "'$_'" : $_ }
1349 if $ENV{CRUNCH_DEBUG};
1351 if (defined $stdin) {
1352 my $child = open STDIN, "-|";
1353 defined $child or die "no fork: $!";
1355 print $stdin or die $!;
1356 close STDOUT or die $!;
1361 return system (@$args) if $opts->{fork};
1364 warn "ENV size is ".length(join(" ",%ENV));
1365 die "exec failed: $!: @$args";
1369 sub ban_node_by_slot {
1370 # Don't start any new jobsteps on this node for 60 seconds
1372 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1373 $slot[$slotid]->{node}->{hold_count}++;
1374 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1379 my ($lockfile, $error_message) = @_;
1380 open L, ">", $lockfile or croak("$lockfile: $!");
1381 if (!flock L, LOCK_EX|LOCK_NB) {
1382 croak("Can't lock $lockfile: $error_message\n");
1389 # checkout-and-build
1393 my $destdir = $ENV{"CRUNCH_SRC"};
1394 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1395 my $repo = $ENV{"CRUNCH_SRC_URL"};
1397 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1399 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1403 unlink "$destdir.commit";
1404 open STDOUT, ">", "$destdir.log";
1405 open STDERR, ">&STDOUT";
1408 my @git_archive_data = <DATA>;
1409 if (@git_archive_data) {
1410 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1411 print TARX @git_archive_data;
1413 die "'tar -C $destdir -xf -' exited $?: $!";
1418 chomp ($pwd = `pwd`);
1419 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1422 for my $src_path ("$destdir/arvados/sdk/python") {
1424 shell_or_die ("virtualenv", $install_dir);
1425 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1429 if (-e "$destdir/crunch_scripts/install") {
1430 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1431 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1433 shell_or_die ("./tests/autotests.sh", $install_dir);
1434 } elsif (-e "./install.sh") {
1435 shell_or_die ("./install.sh", $install_dir);
1439 unlink "$destdir.commit.new";
1440 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1441 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1450 if ($ENV{"DEBUG"}) {
1451 print STDERR "@_\n";
1454 or die "@_ failed: $! exit 0x".sprintf("%x",$?);