2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
104 GetOptions('force-unlock' => \$force_unlock,
105 'git-dir=s' => \$git_dir,
106 'job=s' => \$jobspec,
107 'job-api-token=s' => \$job_api_token,
108 'no-clear-tmp' => \$no_clear_tmp,
109 'resume-stash=s' => \$resume_stash,
112 if (defined $job_api_token) {
113 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
123 $main::ENV{CRUNCH_DEBUG} = 1;
127 $main::ENV{CRUNCH_DEBUG} = 0;
132 my $arv = Arvados->new('apiVersion' => 'v1');
135 my $User = $arv->{'users'}->{'current'}->execute;
143 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144 if (!$force_unlock) {
145 if ($Job->{'is_locked_by_uuid'}) {
146 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148 if ($Job->{'success'} ne undef) {
149 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151 if ($Job->{'running'}) {
152 croak("Job 'running' flag is already set");
154 if ($Job->{'started_at'}) {
155 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
161 $Job = JSON::decode_json($jobspec);
165 map { croak ("No $_ specified") unless $Job->{$_} }
166 qw(script script_version script_parameters);
169 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170 $Job->{'started_at'} = gmtime;
172 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
176 $job_id = $Job->{'uuid'};
178 $metastream = Warehouse::Stream->new(whc => new Warehouse);
180 $metastream->name('.');
181 $metastream->write_start($job_id . '.log.txt');
184 $Job->{'runtime_constraints'} ||= {};
185 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
186 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
189 Log (undef, "check slurm allocation");
192 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
196 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
197 push @sinfo, "$localcpus localhost";
199 if (exists $ENV{SLURM_NODELIST})
201 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
205 my ($ncpus, $slurm_nodelist) = split;
206 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
209 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
212 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
215 foreach (split (",", $ranges))
228 push @nodelist, map {
230 $n =~ s/\[[-,\d]+\]/$_/;
237 push @nodelist, $nodelist;
240 foreach my $nodename (@nodelist)
242 Log (undef, "node $nodename - $ncpus slots");
243 my $node = { name => $nodename,
247 foreach my $cpu (1..$ncpus)
249 push @slot, { node => $node,
253 push @node, @nodelist;
258 # Ensure that we get one jobstep running on each allocated node before
259 # we start overloading nodes with concurrent steps
261 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268 # Claim this job, and make sure nobody else does
269 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
270 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271 croak("Error while updating / locking job");
273 $Job->update_attributes('started_at' => scalar gmtime,
276 'tasks_summary' => { 'failed' => 0,
283 Log (undef, "start");
284 $SIG{'INT'} = sub { $main::please_freeze = 1; };
285 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
286 $SIG{'TERM'} = \&croak;
287 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
288 $SIG{'ALRM'} = sub { $main::please_info = 1; };
289 $SIG{'CONT'} = sub { $main::please_continue = 1; };
290 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
292 $main::please_freeze = 0;
293 $main::please_info = 0;
294 $main::please_continue = 0;
295 $main::please_refresh = 0;
296 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
298 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
299 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
300 $ENV{"JOB_UUID"} = $job_id;
304 my @jobstep_todo = ();
305 my @jobstep_done = ();
306 my @jobstep_tomerge = ();
307 my $jobstep_tomerge_level = 0;
309 my $squeue_kill_checked;
310 my $output_in_keep = 0;
311 my $latest_refresh = scalar time;
315 if (defined $Job->{thawedfromkey})
317 thaw ($Job->{thawedfromkey});
321 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
322 'job_uuid' => $Job->{'uuid'},
327 push @jobstep, { 'level' => 0,
329 'arvados_task' => $first_task,
331 push @jobstep_todo, 0;
337 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
344 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
346 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
349 if (!defined $no_clear_tmp) {
350 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
351 system($clear_tmp_cmd) == 0
352 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
354 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
355 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
357 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
358 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
359 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
361 or croak ("setup.py in $src_path failed: exit ".($?>>8));
369 $build_script = <DATA>;
371 Log (undef, "Install revision ".$Job->{script_version});
372 my $nodelist = join(",", @node);
374 if (!defined $no_clear_tmp) {
375 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
377 my $cleanpid = fork();
380 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
381 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
386 last if $cleanpid == waitpid (-1, WNOHANG);
387 freeze_if_want_freeze ($cleanpid);
388 select (undef, undef, undef, 0.1);
390 Log (undef, "Clean-work-dir exited $?");
393 # Install requested code version
396 my @srunargs = ("srun",
397 "--nodelist=$nodelist",
398 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
400 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
401 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
405 my $treeish = $Job->{'script_version'};
406 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
407 # Todo: let script_version specify repository instead of expecting
408 # parent process to figure it out.
409 $ENV{"CRUNCH_SRC_URL"} = $repo;
411 # Create/update our clone of the remote git repo
413 if (!-d $ENV{"CRUNCH_SRC"}) {
414 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
415 or croak ("git clone $repo failed: exit ".($?>>8));
416 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
418 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
420 # If this looks like a subversion r#, look for it in git-svn commit messages
422 if ($treeish =~ m{^\d{1,4}$}) {
423 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
425 if ($gitlog =~ /^[a-f0-9]{40}$/) {
427 Log (undef, "Using commit $commit for script_version $treeish");
431 # If that didn't work, try asking git to look it up as a tree-ish.
433 if (!defined $commit) {
435 my $cooked_treeish = $treeish;
436 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
437 # Looks like a git branch name -- make sure git knows it's
438 # relative to the remote repo
439 $cooked_treeish = "origin/$treeish";
442 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
444 if ($found =~ /^[0-9a-f]{40}$/s) {
446 if ($commit ne $treeish) {
447 # Make sure we record the real commit id in the database,
448 # frozentokey, logs, etc. -- instead of an abbreviation or a
449 # branch name which can become ambiguous or point to a
450 # different commit in the future.
451 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
452 Log (undef, "Using commit $commit for tree-ish $treeish");
453 if ($commit ne $treeish) {
454 $Job->{'script_version'} = $commit;
456 $Job->update_attributes('script_version' => $commit) or
457 croak("Error while updating job");
463 if (defined $commit) {
464 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
465 @execargs = ("sh", "-c",
466 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
467 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
470 croak ("could not figure out commit id for $treeish");
473 my $installpid = fork();
474 if ($installpid == 0)
476 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
481 last if $installpid == waitpid (-1, WNOHANG);
482 freeze_if_want_freeze ($installpid);
483 select (undef, undef, undef, 0.1);
485 Log (undef, "Install exited $?");
490 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
491 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
496 foreach (qw (script script_version script_parameters runtime_constraints))
500 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
502 foreach (split (/\n/, $Job->{knobs}))
504 Log (undef, "knob " . $_);
509 $main::success = undef;
515 my $thisround_succeeded = 0;
516 my $thisround_failed = 0;
517 my $thisround_failed_multiple = 0;
519 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
520 or $a <=> $b } @jobstep_todo;
521 my $level = $jobstep[$jobstep_todo[0]]->{level};
522 Log (undef, "start level $level");
527 my @freeslot = (0..$#slot);
530 my $progress_is_dirty = 1;
531 my $progress_stats_updated = 0;
533 update_progress_stats();
538 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
540 my $id = $jobstep_todo[$todo_ptr];
541 my $Jobstep = $jobstep[$id];
542 if ($Jobstep->{level} != $level)
547 pipe $reader{$id}, "writer" or croak ($!);
548 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
549 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
551 my $childslot = $freeslot[0];
552 my $childnode = $slot[$childslot]->{node};
553 my $childslotname = join (".",
554 $slot[$childslot]->{node}->{name},
555 $slot[$childslot]->{cpu});
556 my $childpid = fork();
559 $SIG{'INT'} = 'DEFAULT';
560 $SIG{'QUIT'} = 'DEFAULT';
561 $SIG{'TERM'} = 'DEFAULT';
563 foreach (values (%reader))
567 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
568 open(STDOUT,">&writer");
569 open(STDERR,">&writer");
574 delete $ENV{"GNUPGHOME"};
575 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
576 $ENV{"TASK_QSEQUENCE"} = $id;
577 $ENV{"TASK_SEQUENCE"} = $level;
578 $ENV{"JOB_SCRIPT"} = $Job->{script};
579 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
580 $param =~ tr/a-z/A-Z/;
581 $ENV{"JOB_PARAMETER_$param"} = $value;
583 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
584 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
585 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
586 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
587 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
588 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
589 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
595 "--nodelist=".$childnode->{name},
596 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
597 "--job-name=$job_id.$id.$$",
599 my @execargs = qw(sh);
600 my $build_script_to_send = "";
602 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
603 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
604 ."&& cd $ENV{CRUNCH_TMP} ";
607 $build_script_to_send = $build_script;
612 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
613 my @execargs = ('bash', '-c', $command);
614 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
618 if (!defined $childpid)
625 $proc{$childpid} = { jobstep => $id,
628 jobstepname => "$job_id.$id.$childpid",
630 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
631 $slot[$childslot]->{pid} = $childpid;
633 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
634 Log ($id, "child $childpid started on $childslotname");
635 $Jobstep->{starttime} = time;
636 $Jobstep->{node} = $childnode->{name};
637 $Jobstep->{slotindex} = $childslot;
638 delete $Jobstep->{stderr};
639 delete $Jobstep->{finishtime};
641 splice @jobstep_todo, $todo_ptr, 1;
644 $progress_is_dirty = 1;
648 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
650 last THISROUND if $main::please_freeze;
651 if ($main::please_info)
653 $main::please_info = 0;
657 update_progress_stats();
664 check_refresh_wanted();
666 update_progress_stats();
667 select (undef, undef, undef, 0.1);
669 elsif (time - $progress_stats_updated >= 30)
671 update_progress_stats();
673 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
674 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
676 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
677 .($thisround_failed+$thisround_succeeded)
678 .") -- giving up on this round";
679 Log (undef, $message);
683 # move slots from freeslot to holdslot (or back to freeslot) if necessary
684 for (my $i=$#freeslot; $i>=0; $i--) {
685 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
686 push @holdslot, (splice @freeslot, $i, 1);
689 for (my $i=$#holdslot; $i>=0; $i--) {
690 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
691 push @freeslot, (splice @holdslot, $i, 1);
695 # give up if no nodes are succeeding
696 if (!grep { $_->{node}->{losing_streak} == 0 &&
697 $_->{node}->{hold_count} < 4 } @slot) {
698 my $message = "Every node has failed -- giving up on this round";
699 Log (undef, $message);
706 push @freeslot, splice @holdslot;
707 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
710 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
713 if ($main::please_continue) {
714 $main::please_continue = 0;
717 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
721 check_refresh_wanted();
723 update_progress_stats();
724 select (undef, undef, undef, 0.1);
725 killem (keys %proc) if $main::please_freeze;
729 update_progress_stats();
730 freeze_if_want_freeze();
733 if (!defined $main::success)
736 $thisround_succeeded == 0 &&
737 ($thisround_failed == 0 || $thisround_failed > 4))
739 my $message = "stop because $thisround_failed tasks failed and none succeeded";
740 Log (undef, $message);
749 goto ONELEVEL if !defined $main::success;
752 release_allocation();
755 $Job->update_attributes('output' => &collate_output(),
757 'success' => $Job->{'output'} && $main::success,
758 'finished_at' => scalar gmtime)
761 if ($Job->{'output'})
764 my $manifest_text = capturex("whget", $Job->{'output'});
765 $arv->{'collections'}->{'create'}->execute('collection' => {
766 'uuid' => $Job->{'output'},
767 'manifest_text' => $manifest_text,
769 if ($Job->{'output_is_persistent'}) {
770 $arv->{'links'}->{'create'}->execute('link' => {
771 'tail_kind' => 'arvados#user',
772 'tail_uuid' => $User->{'uuid'},
773 'head_kind' => 'arvados#collection',
774 'head_uuid' => $Job->{'output'},
775 'link_class' => 'resources',
781 Log (undef, "Failed to register output manifest: $@");
785 Log (undef, "finish");
792 sub update_progress_stats
794 $progress_stats_updated = time;
795 return if !$progress_is_dirty;
796 my ($todo, $done, $running) = (scalar @jobstep_todo,
797 scalar @jobstep_done,
798 scalar @slot - scalar @freeslot - scalar @holdslot);
799 $Job->{'tasks_summary'} ||= {};
800 $Job->{'tasks_summary'}->{'todo'} = $todo;
801 $Job->{'tasks_summary'}->{'done'} = $done;
802 $Job->{'tasks_summary'}->{'running'} = $running;
804 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
806 Log (undef, "status: $done done, $running running, $todo todo");
807 $progress_is_dirty = 0;
814 my $pid = waitpid (-1, WNOHANG);
815 return 0 if $pid <= 0;
817 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
819 . $slot[$proc{$pid}->{slot}]->{cpu});
820 my $jobstepid = $proc{$pid}->{jobstep};
821 my $elapsed = time - $proc{$pid}->{time};
822 my $Jobstep = $jobstep[$jobstepid];
824 my $childstatus = $?;
825 my $exitvalue = $childstatus >> 8;
826 my $exitinfo = sprintf("exit %d signal %d%s",
829 ($childstatus & 128 ? ' core dump' : ''));
830 $Jobstep->{'arvados_task'}->reload;
831 my $task_success = $Jobstep->{'arvados_task'}->{success};
833 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
835 if (!defined $task_success) {
836 # task did not indicate one way or the other --> fail
837 $Jobstep->{'arvados_task'}->{success} = 0;
838 $Jobstep->{'arvados_task'}->save;
845 $temporary_fail ||= $Jobstep->{node_fail};
846 $temporary_fail ||= ($exitvalue == 111);
849 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
851 # Check for signs of a failed or misconfigured node
852 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
853 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
854 # Don't count this against jobstep failure thresholds if this
855 # node is already suspected faulty and srun exited quickly
856 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
858 Log ($jobstepid, "blaming failure on suspect node " .
859 $slot[$proc{$pid}->{slot}]->{node}->{name});
860 $temporary_fail ||= 1;
862 ban_node_by_slot($proc{$pid}->{slot});
865 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
866 ++$Jobstep->{'failures'},
867 $temporary_fail ? 'temporary ' : 'permanent',
870 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
871 # Give up on this task, and the whole job
873 $main::please_freeze = 1;
876 # Put this task back on the todo queue
877 push @jobstep_todo, $jobstepid;
879 $Job->{'tasks_summary'}->{'failed'}++;
883 ++$thisround_succeeded;
884 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
885 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
886 push @jobstep_done, $jobstepid;
887 Log ($jobstepid, "success in $elapsed seconds");
889 $Jobstep->{exitcode} = $childstatus;
890 $Jobstep->{finishtime} = time;
891 process_stderr ($jobstepid, $task_success);
892 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
894 close $reader{$jobstepid};
895 delete $reader{$jobstepid};
896 delete $slot[$proc{$pid}->{slot}]->{pid};
897 push @freeslot, $proc{$pid}->{slot};
901 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
903 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
905 'order' => 'qsequence'
907 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
909 'level' => $arvados_task->{'sequence'},
911 'arvados_task' => $arvados_task
913 push @jobstep, $jobstep;
914 push @jobstep_todo, $#jobstep;
917 $progress_is_dirty = 1;
921 sub check_refresh_wanted
923 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
924 if (@stat && $stat[9] > $latest_refresh) {
925 $latest_refresh = scalar time;
927 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
928 for my $attr ('cancelled_at',
929 'cancelled_by_user_uuid',
930 'cancelled_by_client_uuid') {
931 $Job->{$attr} = $Job2->{$attr};
933 if ($Job->{'cancelled_at'}) {
934 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
935 " by user " . $Job->{cancelled_by_user_uuid});
937 $main::please_freeze = 1;
945 # return if the kill list was checked <4 seconds ago
946 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
950 $squeue_kill_checked = time;
952 # use killem() on procs whose killtime is reached
955 if (exists $proc{$_}->{killtime}
956 && $proc{$_}->{killtime} <= time)
962 # return if the squeue was checked <60 seconds ago
963 if (defined $squeue_checked && $squeue_checked > time - 60)
967 $squeue_checked = time;
971 # here is an opportunity to check for mysterious problems with local procs
975 # get a list of steps still running
976 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
978 if ($squeue[-1] ne "ok")
984 # which of my jobsteps are running, according to squeue?
988 if (/^(\d+)\.(\d+) (\S+)/)
990 if ($1 eq $ENV{SLURM_JOBID})
997 # which of my active child procs (>60s old) were not mentioned by squeue?
1000 if ($proc{$_}->{time} < time - 60
1001 && !exists $ok{$proc{$_}->{jobstepname}}
1002 && !exists $proc{$_}->{killtime})
1004 # kill this proc if it hasn't exited in 30 seconds
1005 $proc{$_}->{killtime} = time + 30;
1011 sub release_allocation
1015 Log (undef, "release job allocation");
1016 system "scancel $ENV{SLURM_JOBID}";
1024 foreach my $job (keys %reader)
1027 while (0 < sysread ($reader{$job}, $buf, 8192))
1029 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1030 $jobstep[$job]->{stderr} .= $buf;
1031 preprocess_stderr ($job);
1032 if (length ($jobstep[$job]->{stderr}) > 16384)
1034 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1043 sub preprocess_stderr
1047 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1049 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1050 Log ($job, "stderr $line");
1051 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1053 $main::please_freeze = 1;
1055 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1056 $jobstep[$job]->{node_fail} = 1;
1057 ban_node_by_slot($jobstep[$job]->{slotindex});
1066 my $task_success = shift;
1067 preprocess_stderr ($job);
1070 Log ($job, "stderr $_");
1071 } split ("\n", $jobstep[$job]->{stderr});
1077 my $whc = Warehouse->new;
1078 Log (undef, "collate");
1079 $whc->write_start (1);
1083 next if (!exists $_->{'arvados_task'}->{output} ||
1084 !$_->{'arvados_task'}->{'success'} ||
1085 $_->{'exitcode'} != 0);
1086 my $output = $_->{'arvados_task'}->{output};
1087 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1089 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1090 $whc->write_data ($output);
1092 elsif (@jobstep == 1)
1094 $joboutput = $output;
1097 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1099 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1100 $whc->write_data ($outblock);
1104 my $errstr = $whc->errstr;
1105 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1109 $joboutput = $whc->write_finish if !defined $joboutput;
1112 Log (undef, "output $joboutput");
1113 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1117 Log (undef, "output undef");
1127 my $sig = 2; # SIGINT first
1128 if (exists $proc{$_}->{"sent_$sig"} &&
1129 time - $proc{$_}->{"sent_$sig"} > 4)
1131 $sig = 15; # SIGTERM if SIGINT doesn't work
1133 if (exists $proc{$_}->{"sent_$sig"} &&
1134 time - $proc{$_}->{"sent_$sig"} > 4)
1136 $sig = 9; # SIGKILL if SIGTERM doesn't work
1138 if (!exists $proc{$_}->{"sent_$sig"})
1140 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1142 select (undef, undef, undef, 0.1);
1145 kill $sig, $_; # srun wants two SIGINT to really interrupt
1147 $proc{$_}->{"sent_$sig"} = time;
1148 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1158 vec($bits,fileno($_),1) = 1;
1164 sub Log # ($jobstep_id, $logmessage)
1166 if ($_[1] =~ /\n/) {
1167 for my $line (split (/\n/, $_[1])) {
1172 my $fh = select STDERR; $|=1; select $fh;
1173 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1174 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1177 if ($metastream || -t STDERR) {
1178 my @gmtime = gmtime;
1179 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1180 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1182 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1184 return if !$metastream;
1185 $metastream->write_data ($datetime . " " . $message);
1191 my ($package, $file, $line) = caller;
1192 my $message = "@_ at $file line $line\n";
1193 Log (undef, $message);
1194 freeze() if @jobstep_todo;
1195 collate_output() if @jobstep_todo;
1197 save_meta() if $metastream;
1204 return if !$job_has_uuid;
1205 $Job->update_attributes('running' => 0,
1207 'finished_at' => scalar gmtime);
1213 my $justcheckpoint = shift; # false if this will be the last meta saved
1214 my $m = $metastream;
1215 $m = $m->copy if $justcheckpoint;
1217 my $whc = Warehouse->new;
1218 my $loglocator = $whc->store_block ($m->as_string);
1219 $arv->{'collections'}->{'create'}->execute('collection' => {
1220 'uuid' => $loglocator,
1221 'manifest_text' => $m->as_string,
1223 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1224 Log (undef, "log manifest is $loglocator");
1225 $Job->{'log'} = $loglocator;
1226 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1230 sub freeze_if_want_freeze
1232 if ($main::please_freeze)
1234 release_allocation();
1237 # kill some srun procs before freeze+stop
1238 map { $proc{$_} = {} } @_;
1241 killem (keys %proc);
1242 select (undef, undef, undef, 0.1);
1244 while (($died = waitpid (-1, WNOHANG)) > 0)
1246 delete $proc{$died};
1261 Log (undef, "Freeze not implemented");
1268 croak ("Thaw not implemented");
1272 Log (undef, "thaw from $key");
1277 @jobstep_tomerge = ();
1278 $jobstep_tomerge_level = 0;
1281 my $stream = new Warehouse::Stream ( whc => $whc,
1282 hash => [split (",", $key)] );
1284 while (my $dataref = $stream->read_until (undef, "\n\n"))
1286 if ($$dataref =~ /^job /)
1288 foreach (split ("\n", $$dataref))
1290 my ($k, $v) = split ("=", $_, 2);
1291 $frozenjob->{$k} = freezeunquote ($v);
1296 if ($$dataref =~ /^merge (\d+) (.*)/)
1298 $jobstep_tomerge_level = $1;
1300 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1305 foreach (split ("\n", $$dataref))
1307 my ($k, $v) = split ("=", $_, 2);
1308 $Jobstep->{$k} = freezeunquote ($v) if $k;
1310 $Jobstep->{'failures'} = 0;
1311 push @jobstep, $Jobstep;
1313 if ($Jobstep->{exitcode} eq "0")
1315 push @jobstep_done, $#jobstep;
1319 push @jobstep_todo, $#jobstep;
1323 foreach (qw (script script_version script_parameters))
1325 $Job->{$_} = $frozenjob->{$_};
1327 $Job->save if $job_has_uuid;
1343 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1350 my $srunargs = shift;
1351 my $execargs = shift;
1352 my $opts = shift || {};
1354 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1355 print STDERR (join (" ",
1356 map { / / ? "'$_'" : $_ }
1359 if $ENV{CRUNCH_DEBUG};
1361 if (defined $stdin) {
1362 my $child = open STDIN, "-|";
1363 defined $child or die "no fork: $!";
1365 print $stdin or die $!;
1366 close STDOUT or die $!;
1371 return system (@$args) if $opts->{fork};
1374 warn "ENV size is ".length(join(" ",%ENV));
1375 die "exec failed: $!: @$args";
1379 sub ban_node_by_slot {
1380 # Don't start any new jobsteps on this node for 60 seconds
1382 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1383 $slot[$slotid]->{node}->{hold_count}++;
1384 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1389 my ($lockfile, $error_message) = @_;
1390 open L, ">", $lockfile or croak("$lockfile: $!");
1391 if (!flock L, LOCK_EX|LOCK_NB) {
1392 croak("Can't lock $lockfile: $error_message\n");
1399 # checkout-and-build
1403 my $destdir = $ENV{"CRUNCH_SRC"};
1404 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1405 my $repo = $ENV{"CRUNCH_SRC_URL"};
1407 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1409 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1413 unlink "$destdir.commit";
1414 open STDOUT, ">", "$destdir.log";
1415 open STDERR, ">&STDOUT";
1418 my @git_archive_data = <DATA>;
1419 if (@git_archive_data) {
1420 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1421 print TARX @git_archive_data;
1423 die "'tar -C $destdir -xf -' exited $?: $!";
1428 chomp ($pwd = `pwd`);
1429 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1432 for my $src_path ("$destdir/arvados/sdk/python") {
1434 shell_or_die ("virtualenv", $install_dir);
1435 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1439 if (-e "$destdir/crunch_scripts/install") {
1440 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1441 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1443 shell_or_die ("./tests/autotests.sh", $install_dir);
1444 } elsif (-e "./install.sh") {
1445 shell_or_die ("./install.sh", $install_dir);
1449 unlink "$destdir.commit.new";
1450 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1451 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1460 if ($ENV{"DEBUG"}) {
1461 print STDERR "@_\n";
1464 or die "@_ failed: $! exit 0x".sprintf("%x",$?);