2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 if ($ENV{"USER"} ne "crunch" && $< != 0) {
80 # use a tmp dir unique for my uid
81 $ENV{"CRUNCH_TMP"} .= "-$<";
83 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
84 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
85 mkdir ($ENV{"JOB_WORK"});
92 GetOptions('force-unlock' => \$force_unlock,
93 'git-dir=s' => \$git_dir,
95 'job-api-token=s' => \$job_api_token,
96 'resume-stash=s' => \$resume_stash,
99 if (defined $job_api_token) {
100 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
103 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
104 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
105 my $local_job = !$job_has_uuid;
114 $main::ENV{CRUNCH_DEBUG} = 1;
118 $main::ENV{CRUNCH_DEBUG} = 0;
123 my $arv = Arvados->new;
124 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
126 $metastream->write_start('log.txt');
128 my $User = $arv->{'users'}->{'current'}->execute;
136 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
137 if (!$force_unlock) {
138 if ($Job->{'is_locked_by_uuid'}) {
139 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
141 if ($Job->{'success'} ne undef) {
142 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
144 if ($Job->{'running'}) {
145 croak("Job 'running' flag is already set");
147 if ($Job->{'started_at'}) {
148 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
154 $Job = JSON::decode_json($jobspec);
158 map { croak ("No $_ specified") unless $Job->{$_} }
159 qw(script script_version script_parameters);
162 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
163 $Job->{'started_at'} = gmtime;
165 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
169 $job_id = $Job->{'uuid'};
173 $Job->{'resource_limits'} ||= {};
174 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
175 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
178 Log (undef, "check slurm allocation");
181 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
185 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
186 push @sinfo, "$localcpus localhost";
188 if (exists $ENV{SLURM_NODELIST})
190 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
194 my ($ncpus, $slurm_nodelist) = split;
195 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204 foreach (split (",", $ranges))
217 push @nodelist, map {
219 $n =~ s/\[[-,\d]+\]/$_/;
226 push @nodelist, $nodelist;
229 foreach my $nodename (@nodelist)
231 Log (undef, "node $nodename - $ncpus slots");
232 my $node = { name => $nodename,
236 foreach my $cpu (1..$ncpus)
238 push @slot, { node => $node,
242 push @node, @nodelist;
247 # Ensure that we get one jobstep running on each allocated node before
248 # we start overloading nodes with concurrent steps
250 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
257 # Claim this job, and make sure nobody else does
259 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
260 $Job->{'started_at'} = gmtime;
261 $Job->{'running'} = 1;
262 $Job->{'success'} = undef;
263 $Job->{'tasks_summary'} = { 'failed' => 0,
268 unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269 croak("Error while updating / locking job");
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
303 if (defined $Job->{thawedfromkey})
305 thaw ($Job->{thawedfromkey});
309 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310 'job_uuid' => $Job->{'uuid'},
315 push @jobstep, { 'level' => 0,
317 'arvados_task' => $first_task,
319 push @jobstep_todo, 0;
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
337 $build_script = <DATA>;
339 Log (undef, "Install revision ".$Job->{script_version});
340 my $nodelist = join(",", @node);
342 # Clean out crunch_tmp/work and crunch_tmp/opt
344 my $cleanpid = fork();
347 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
353 last if $cleanpid == waitpid (-1, WNOHANG);
354 freeze_if_want_freeze ($cleanpid);
355 select (undef, undef, undef, 0.1);
357 Log (undef, "Clean-work-dir exited $?");
359 # Install requested code version
362 my @srunargs = ("srun",
363 "--nodelist=$nodelist",
364 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
366 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
372 my $treeish = $Job->{'script_version'};
373 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374 # Todo: let script_version specify repository instead of expecting
375 # parent process to figure it out.
376 $ENV{"CRUNCH_SRC_URL"} = $repo;
378 # Create/update our clone of the remote git repo
380 if (!-d $ENV{"CRUNCH_SRC"}) {
381 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382 or croak ("git clone $repo failed: exit ".($?>>8));
383 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
385 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
387 # If this looks like a subversion r#, look for it in git-svn commit messages
389 if ($treeish =~ m{^\d{1,4}$}) {
390 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
392 if ($gitlog =~ /^[a-f0-9]{40}$/) {
394 Log (undef, "Using commit $commit for script_version $treeish");
398 # If that didn't work, try asking git to look it up as a tree-ish.
400 if (!defined $commit) {
402 my $cooked_treeish = $treeish;
403 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404 # Looks like a git branch name -- make sure git knows it's
405 # relative to the remote repo
406 $cooked_treeish = "origin/$treeish";
409 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
411 if ($found =~ /^[0-9a-f]{40}$/s) {
413 if ($commit ne $treeish) {
414 # Make sure we record the real commit id in the database,
415 # frozentokey, logs, etc. -- instead of an abbreviation or a
416 # branch name which can become ambiguous or point to a
417 # different commit in the future.
418 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419 Log (undef, "Using commit $commit for tree-ish $treeish");
420 if ($commit ne $treeish) {
421 $Job->{'script_version'} = $commit;
422 !$job_has_uuid or $Job->save() or croak("Error while updating job");
428 if (defined $commit) {
429 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430 @execargs = ("sh", "-c",
431 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
432 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
435 croak ("could not figure out commit id for $treeish");
438 my $installpid = fork();
439 if ($installpid == 0)
441 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
446 last if $installpid == waitpid (-1, WNOHANG);
447 freeze_if_want_freeze ($installpid);
448 select (undef, undef, undef, 0.1);
450 Log (undef, "Install exited $?");
455 foreach (qw (script script_version script_parameters resource_limits))
459 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
461 foreach (split (/\n/, $Job->{knobs}))
463 Log (undef, "knob " . $_);
474 my $thisround_succeeded = 0;
475 my $thisround_failed = 0;
476 my $thisround_failed_multiple = 0;
478 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
479 or $a <=> $b } @jobstep_todo;
480 my $level = $jobstep[$jobstep_todo[0]]->{level};
481 Log (undef, "start level $level");
486 my @freeslot = (0..$#slot);
489 my $progress_is_dirty = 1;
490 my $progress_stats_updated = 0;
492 update_progress_stats();
497 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
499 $main::please_continue = 0;
501 my $id = $jobstep_todo[$todo_ptr];
502 my $Jobstep = $jobstep[$id];
503 if ($Jobstep->{level} != $level)
507 if ($Jobstep->{attempts} > 9)
509 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
514 pipe $reader{$id}, "writer" or croak ($!);
515 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
518 my $childslot = $freeslot[0];
519 my $childnode = $slot[$childslot]->{node};
520 my $childslotname = join (".",
521 $slot[$childslot]->{node}->{name},
522 $slot[$childslot]->{cpu});
523 my $childpid = fork();
526 $SIG{'INT'} = 'DEFAULT';
527 $SIG{'QUIT'} = 'DEFAULT';
528 $SIG{'TERM'} = 'DEFAULT';
530 foreach (values (%reader))
534 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535 open(STDOUT,">&writer");
536 open(STDERR,">&writer");
541 delete $ENV{"GNUPGHOME"};
542 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543 $ENV{"TASK_QSEQUENCE"} = $id;
544 $ENV{"TASK_SEQUENCE"} = $level;
545 $ENV{"JOB_SCRIPT"} = $Job->{script};
546 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547 $param =~ tr/a-z/A-Z/;
548 $ENV{"JOB_PARAMETER_$param"} = $value;
550 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
560 "--nodelist=".$childnode->{name},
561 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562 "--job-name=$job_id.$id.$$",
564 my @execargs = qw(sh);
565 my $build_script_to_send = "";
567 "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568 ."&& cd $ENV{CRUNCH_TMP} ";
571 $build_script_to_send = $build_script;
575 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
576 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
577 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
579 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580 my @execargs = ('bash', '-c', $command);
581 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
585 if (!defined $childpid)
592 $proc{$childpid} = { jobstep => $id,
595 jobstepname => "$job_id.$id.$childpid",
597 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598 $slot[$childslot]->{pid} = $childpid;
600 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601 Log ($id, "child $childpid started on $childslotname");
602 $Jobstep->{attempts} ++;
603 $Jobstep->{starttime} = time;
604 $Jobstep->{node} = $childnode->{name};
605 $Jobstep->{slotindex} = $childslot;
606 delete $Jobstep->{stderr};
607 delete $Jobstep->{finishtime};
609 splice @jobstep_todo, $todo_ptr, 1;
612 $progress_is_dirty = 1;
616 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
618 last THISROUND if $main::please_freeze;
619 if ($main::please_info)
621 $main::please_info = 0;
625 update_progress_stats();
633 update_progress_stats();
634 select (undef, undef, undef, 0.1);
636 elsif (time - $progress_stats_updated >= 30)
638 update_progress_stats();
640 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
643 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644 .($thisround_failed+$thisround_succeeded)
645 .") -- giving up on this round";
646 Log (undef, $message);
650 # move slots from freeslot to holdslot (or back to freeslot) if necessary
651 for (my $i=$#freeslot; $i>=0; $i--) {
652 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653 push @holdslot, (splice @freeslot, $i, 1);
656 for (my $i=$#holdslot; $i>=0; $i--) {
657 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658 push @freeslot, (splice @holdslot, $i, 1);
662 # give up if no nodes are succeeding
663 if (!grep { $_->{node}->{losing_streak} == 0 &&
664 $_->{node}->{hold_count} < 4 } @slot) {
665 my $message = "Every node has failed -- giving up on this round";
666 Log (undef, $message);
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 goto THISROUND if $main::please_continue;
681 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
686 update_progress_stats();
687 select (undef, undef, undef, 0.1);
688 killem (keys %proc) if $main::please_freeze;
692 update_progress_stats();
693 freeze_if_want_freeze();
696 if (!defined $success)
699 $thisround_succeeded == 0 &&
700 ($thisround_failed == 0 || $thisround_failed > 4))
702 my $message = "stop because $thisround_failed tasks failed and none succeeded";
703 Log (undef, $message);
712 goto ONELEVEL if !defined $success;
715 release_allocation();
718 $Job->{'output'} = &collate_output();
719 $Job->{'running'} = 0;
720 $Job->{'success'} = $Job->{'output'} && $success;
721 $Job->{'finished_at'} = gmtime;
722 $Job->save if $job_has_uuid;
724 if ($Job->{'output'})
727 my $manifest_text = capturex("whget", $Job->{'output'});
728 $arv->{'collections'}->{'create'}->execute('collection' => {
729 'uuid' => $Job->{'output'},
730 'manifest_text' => $manifest_text,
734 Log (undef, "Failed to register output manifest: $@");
738 Log (undef, "finish");
745 sub update_progress_stats
747 $progress_stats_updated = time;
748 return if !$progress_is_dirty;
749 my ($todo, $done, $running) = (scalar @jobstep_todo,
750 scalar @jobstep_done,
751 scalar @slot - scalar @freeslot - scalar @holdslot);
752 $Job->{'tasks_summary'} ||= {};
753 $Job->{'tasks_summary'}->{'todo'} = $todo;
754 $Job->{'tasks_summary'}->{'done'} = $done;
755 $Job->{'tasks_summary'}->{'running'} = $running;
756 $Job->save if $job_has_uuid;
757 Log (undef, "status: $done done, $running running, $todo todo");
758 $progress_is_dirty = 0;
765 my $pid = waitpid (-1, WNOHANG);
766 return 0 if $pid <= 0;
768 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
770 . $slot[$proc{$pid}->{slot}]->{cpu});
771 my $jobstepid = $proc{$pid}->{jobstep};
772 my $elapsed = time - $proc{$pid}->{time};
773 my $Jobstep = $jobstep[$jobstepid];
776 my $exitinfo = "exit $exitcode";
777 $Jobstep->{'arvados_task'}->reload;
778 my $success = $Jobstep->{'arvados_task'}->{success};
780 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
782 if (!defined $success) {
783 # task did not indicate one way or the other --> fail
784 $Jobstep->{'arvados_task'}->{success} = 0;
785 $Jobstep->{'arvados_task'}->save;
791 my $no_incr_attempts;
792 $no_incr_attempts = 1 if $Jobstep->{node_fail};
795 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
797 # Check for signs of a failed or misconfigured node
798 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
799 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
800 # Don't count this against jobstep failure thresholds if this
801 # node is already suspected faulty and srun exited quickly
802 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
804 $Jobstep->{attempts} > 1) {
805 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
806 $no_incr_attempts = 1;
807 --$Jobstep->{attempts};
809 ban_node_by_slot($proc{$pid}->{slot});
812 push @jobstep_todo, $jobstepid;
813 Log ($jobstepid, "failure in $elapsed seconds");
815 --$Jobstep->{attempts} if $no_incr_attempts;
816 $Job->{'tasks_summary'}->{'failed'}++;
820 ++$thisround_succeeded;
821 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
822 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
823 push @jobstep_done, $jobstepid;
824 Log ($jobstepid, "success in $elapsed seconds");
826 $Jobstep->{exitcode} = $exitcode;
827 $Jobstep->{finishtime} = time;
828 process_stderr ($jobstepid, $success);
829 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
831 close $reader{$jobstepid};
832 delete $reader{$jobstepid};
833 delete $slot[$proc{$pid}->{slot}]->{pid};
834 push @freeslot, $proc{$pid}->{slot};
838 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
840 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
842 'order' => 'qsequence'
844 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
846 'level' => $arvados_task->{'sequence'},
848 'arvados_task' => $arvados_task
850 push @jobstep, $jobstep;
851 push @jobstep_todo, $#jobstep;
854 $progress_is_dirty = 1;
861 # return if the kill list was checked <4 seconds ago
862 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
866 $squeue_kill_checked = time;
868 # use killem() on procs whose killtime is reached
871 if (exists $proc{$_}->{killtime}
872 && $proc{$_}->{killtime} <= time)
878 # return if the squeue was checked <60 seconds ago
879 if (defined $squeue_checked && $squeue_checked > time - 60)
883 $squeue_checked = time;
887 # here is an opportunity to check for mysterious problems with local procs
891 # get a list of steps still running
892 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
894 if ($squeue[-1] ne "ok")
900 # which of my jobsteps are running, according to squeue?
904 if (/^(\d+)\.(\d+) (\S+)/)
906 if ($1 eq $ENV{SLURM_JOBID})
913 # which of my active child procs (>60s old) were not mentioned by squeue?
916 if ($proc{$_}->{time} < time - 60
917 && !exists $ok{$proc{$_}->{jobstepname}}
918 && !exists $proc{$_}->{killtime})
920 # kill this proc if it hasn't exited in 30 seconds
921 $proc{$_}->{killtime} = time + 30;
927 sub release_allocation
931 Log (undef, "release job allocation");
932 system "scancel $ENV{SLURM_JOBID}";
940 foreach my $job (keys %reader)
943 while (0 < sysread ($reader{$job}, $buf, 8192))
945 print STDERR $buf if $ENV{CRUNCH_DEBUG};
946 $jobstep[$job]->{stderr} .= $buf;
947 preprocess_stderr ($job);
948 if (length ($jobstep[$job]->{stderr}) > 16384)
950 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
959 sub preprocess_stderr
963 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
965 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
966 Log ($job, "stderr $line");
967 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
969 $main::please_freeze = 1;
971 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
972 $jobstep[$job]->{node_fail} = 1;
973 ban_node_by_slot($jobstep[$job]->{slotindex});
983 preprocess_stderr ($job);
986 Log ($job, "stderr $_");
987 } split ("\n", $jobstep[$job]->{stderr});
993 my $whc = Warehouse->new;
994 Log (undef, "collate");
995 $whc->write_start (1);
999 next if (!exists $_->{'arvados_task'}->{output} ||
1000 !$_->{'arvados_task'}->{'success'} ||
1001 $_->{'exitcode'} != 0);
1002 my $output = $_->{'arvados_task'}->{output};
1003 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1005 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1006 $whc->write_data ($output);
1008 elsif (@jobstep == 1)
1010 $joboutput = $output;
1013 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1015 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1016 $whc->write_data ($outblock);
1020 my $errstr = $whc->errstr;
1021 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1025 $joboutput = $whc->write_finish if !defined $joboutput;
1028 Log (undef, "output $joboutput");
1029 $Job->{'output'} = $joboutput;
1030 $Job->save if $job_has_uuid;
1034 Log (undef, "output undef");
1044 my $sig = 2; # SIGINT first
1045 if (exists $proc{$_}->{"sent_$sig"} &&
1046 time - $proc{$_}->{"sent_$sig"} > 4)
1048 $sig = 15; # SIGTERM if SIGINT doesn't work
1050 if (exists $proc{$_}->{"sent_$sig"} &&
1051 time - $proc{$_}->{"sent_$sig"} > 4)
1053 $sig = 9; # SIGKILL if SIGTERM doesn't work
1055 if (!exists $proc{$_}->{"sent_$sig"})
1057 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1059 select (undef, undef, undef, 0.1);
1062 kill $sig, $_; # srun wants two SIGINT to really interrupt
1064 $proc{$_}->{"sent_$sig"} = time;
1065 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1075 vec($bits,fileno($_),1) = 1;
1081 sub Log # ($jobstep_id, $logmessage)
1083 if ($_[1] =~ /\n/) {
1084 for my $line (split (/\n/, $_[1])) {
1089 my $fh = select STDERR; $|=1; select $fh;
1090 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1091 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1094 if ($metastream || -t STDERR) {
1095 my @gmtime = gmtime;
1096 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1097 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1099 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1101 return if !$metastream;
1102 $metastream->write_data ($datetime . " " . $message);
1108 my ($package, $file, $line) = caller;
1109 my $message = "@_ at $file line $line\n";
1110 Log (undef, $message);
1111 freeze() if @jobstep_todo;
1112 collate_output() if @jobstep_todo;
1114 save_meta() if $metastream;
1121 return if !$job_has_uuid;
1123 $Job->{'running'} = 0;
1124 $Job->{'success'} = 0;
1125 $Job->{'finished_at'} = gmtime;
1132 my $justcheckpoint = shift; # false if this will be the last meta saved
1133 my $m = $metastream;
1134 $m = $m->copy if $justcheckpoint;
1136 my $loglocator = $m->as_key;
1137 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1138 Log (undef, "meta key is $loglocator");
1139 $Job->{'log'} = $loglocator;
1140 $Job->save if $job_has_uuid;
1144 sub freeze_if_want_freeze
1146 if ($main::please_freeze)
1148 release_allocation();
1151 # kill some srun procs before freeze+stop
1152 map { $proc{$_} = {} } @_;
1155 killem (keys %proc);
1156 select (undef, undef, undef, 0.1);
1158 while (($died = waitpid (-1, WNOHANG)) > 0)
1160 delete $proc{$died};
1175 Log (undef, "Freeze not implemented");
1182 croak ("Thaw not implemented");
1186 Log (undef, "thaw from $key");
1191 @jobstep_tomerge = ();
1192 $jobstep_tomerge_level = 0;
1195 my $stream = new Warehouse::Stream ( whc => $whc,
1196 hash => [split (",", $key)] );
1198 while (my $dataref = $stream->read_until (undef, "\n\n"))
1200 if ($$dataref =~ /^job /)
1202 foreach (split ("\n", $$dataref))
1204 my ($k, $v) = split ("=", $_, 2);
1205 $frozenjob->{$k} = freezeunquote ($v);
1210 if ($$dataref =~ /^merge (\d+) (.*)/)
1212 $jobstep_tomerge_level = $1;
1214 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1219 foreach (split ("\n", $$dataref))
1221 my ($k, $v) = split ("=", $_, 2);
1222 $Jobstep->{$k} = freezeunquote ($v) if $k;
1224 $Jobstep->{attempts} = 0;
1225 push @jobstep, $Jobstep;
1227 if ($Jobstep->{exitcode} eq "0")
1229 push @jobstep_done, $#jobstep;
1233 push @jobstep_todo, $#jobstep;
1237 foreach (qw (script script_version script_parameters))
1239 $Job->{$_} = $frozenjob->{$_};
1241 $Job->save if $job_has_uuid;
1257 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1264 my $srunargs = shift;
1265 my $execargs = shift;
1266 my $opts = shift || {};
1268 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1269 print STDERR (join (" ",
1270 map { / / ? "'$_'" : $_ }
1273 if $ENV{CRUNCH_DEBUG};
1275 if (defined $stdin) {
1276 my $child = open STDIN, "-|";
1277 defined $child or die "no fork: $!";
1279 print $stdin or die $!;
1280 close STDOUT or die $!;
1285 return system (@$args) if $opts->{fork};
1288 warn "ENV size is ".length(join(" ",%ENV));
1289 die "exec failed: $!: @$args";
1293 sub ban_node_by_slot {
1294 # Don't start any new jobsteps on this node for 60 seconds
1296 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1297 $slot[$slotid]->{node}->{hold_count}++;
1298 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1304 # checkout-and-build
1308 my $destdir = $ENV{"CRUNCH_SRC"};
1309 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1310 my $repo = $ENV{"CRUNCH_SRC_URL"};
1312 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1314 if (readlink ("$destdir.commit") eq $commit) {
1318 unlink "$destdir.commit";
1319 open STDOUT, ">", "$destdir.log";
1320 open STDERR, ">&STDOUT";
1323 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1326 die "'tar -C $destdir -xf -' exited $?: $!";
1330 chomp ($pwd = `pwd`);
1331 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1333 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1335 shell_or_die ("./tests/autotests.sh", $install_dir);
1336 } elsif (-e "./install.sh") {
1337 shell_or_die ("./install.sh", $install_dir);
1341 unlink "$destdir.commit.new";
1342 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1343 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1352 if ($ENV{"DEBUG"}) {
1353 print STDERR "@_\n";
1356 or die "@_ failed: $! exit 0x".sprintf("%x",$?);