2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 unless (defined $ENV{"CRUNCH_TMP"}) {
79 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
80 if ($ENV{"USER"} ne "crunch" && $< != 0) {
81 # use a tmp dir unique for my uid
82 $ENV{"CRUNCH_TMP"} .= "-$<";
85 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
86 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
87 mkdir ($ENV{"JOB_WORK"});
94 GetOptions('force-unlock' => \$force_unlock,
95 'git-dir=s' => \$git_dir,
97 'job-api-token=s' => \$job_api_token,
98 'resume-stash=s' => \$resume_stash,
101 if (defined $job_api_token) {
102 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
106 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
107 my $local_job = !$job_has_uuid;
116 $main::ENV{CRUNCH_DEBUG} = 1;
120 $main::ENV{CRUNCH_DEBUG} = 0;
125 my $arv = Arvados->new;
126 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
128 $metastream->write_start('log.txt');
130 my $User = $arv->{'users'}->{'current'}->execute;
138 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
139 if (!$force_unlock) {
140 if ($Job->{'is_locked_by_uuid'}) {
141 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
143 if ($Job->{'success'} ne undef) {
144 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
146 if ($Job->{'running'}) {
147 croak("Job 'running' flag is already set");
149 if ($Job->{'started_at'}) {
150 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156 $Job = JSON::decode_json($jobspec);
160 map { croak ("No $_ specified") unless $Job->{$_} }
161 qw(script script_version script_parameters);
164 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
165 $Job->{'started_at'} = gmtime;
167 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
171 $job_id = $Job->{'uuid'};
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
180 Log (undef, "check slurm allocation");
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
187 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188 push @sinfo, "$localcpus localhost";
190 if (exists $ENV{SLURM_NODELIST})
192 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
196 my ($ncpus, $slurm_nodelist) = split;
197 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
200 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
203 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
206 foreach (split (",", $ranges))
219 push @nodelist, map {
221 $n =~ s/\[[-,\d]+\]/$_/;
228 push @nodelist, $nodelist;
231 foreach my $nodename (@nodelist)
233 Log (undef, "node $nodename - $ncpus slots");
234 my $node = { name => $nodename,
238 foreach my $cpu (1..$ncpus)
240 push @slot, { node => $node,
244 push @node, @nodelist;
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259 # Claim this job, and make sure nobody else does
261 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
262 $Job->{'started_at'} = gmtime;
263 $Job->{'running'} = 1;
264 $Job->{'success'} = undef;
265 $Job->{'tasks_summary'} = { 'failed' => 0,
270 unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271 croak("Error while updating / locking job");
277 Log (undef, "start");
278 $SIG{'INT'} = sub { $main::please_freeze = 1; };
279 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
280 $SIG{'TERM'} = \&croak;
281 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
282 $SIG{'ALRM'} = sub { $main::please_info = 1; };
283 $SIG{'CONT'} = sub { $main::please_continue = 1; };
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
305 if (defined $Job->{thawedfromkey})
307 thaw ($Job->{thawedfromkey});
311 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312 'job_uuid' => $Job->{'uuid'},
317 push @jobstep, { 'level' => 0,
319 'arvados_task' => $first_task,
321 push @jobstep_todo, 0;
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
333 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
339 $build_script = <DATA>;
341 Log (undef, "Install revision ".$Job->{script_version});
342 my $nodelist = join(",", @node);
344 # Clean out crunch_tmp/work and crunch_tmp/opt
346 my $cleanpid = fork();
349 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
355 last if $cleanpid == waitpid (-1, WNOHANG);
356 freeze_if_want_freeze ($cleanpid);
357 select (undef, undef, undef, 0.1);
359 Log (undef, "Clean-work-dir exited $?");
361 # Install requested code version
364 my @srunargs = ("srun",
365 "--nodelist=$nodelist",
366 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
368 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
374 my $treeish = $Job->{'script_version'};
375 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376 # Todo: let script_version specify repository instead of expecting
377 # parent process to figure it out.
378 $ENV{"CRUNCH_SRC_URL"} = $repo;
380 # Create/update our clone of the remote git repo
382 if (!-d $ENV{"CRUNCH_SRC"}) {
383 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384 or croak ("git clone $repo failed: exit ".($?>>8));
385 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
387 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
389 # If this looks like a subversion r#, look for it in git-svn commit messages
391 if ($treeish =~ m{^\d{1,4}$}) {
392 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
394 if ($gitlog =~ /^[a-f0-9]{40}$/) {
396 Log (undef, "Using commit $commit for script_version $treeish");
400 # If that didn't work, try asking git to look it up as a tree-ish.
402 if (!defined $commit) {
404 my $cooked_treeish = $treeish;
405 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406 # Looks like a git branch name -- make sure git knows it's
407 # relative to the remote repo
408 $cooked_treeish = "origin/$treeish";
411 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
413 if ($found =~ /^[0-9a-f]{40}$/s) {
415 if ($commit ne $treeish) {
416 # Make sure we record the real commit id in the database,
417 # frozentokey, logs, etc. -- instead of an abbreviation or a
418 # branch name which can become ambiguous or point to a
419 # different commit in the future.
420 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421 Log (undef, "Using commit $commit for tree-ish $treeish");
422 if ($commit ne $treeish) {
423 $Job->{'script_version'} = $commit;
424 !$job_has_uuid or $Job->save() or croak("Error while updating job");
430 if (defined $commit) {
431 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432 @execargs = ("sh", "-c",
433 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
437 croak ("could not figure out commit id for $treeish");
440 my $installpid = fork();
441 if ($installpid == 0)
443 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
448 last if $installpid == waitpid (-1, WNOHANG);
449 freeze_if_want_freeze ($installpid);
450 select (undef, undef, undef, 0.1);
452 Log (undef, "Install exited $?");
457 foreach (qw (script script_version script_parameters runtime_constraints))
461 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
463 foreach (split (/\n/, $Job->{knobs}))
465 Log (undef, "knob " . $_);
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481 or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
488 my @freeslot = (0..$#slot);
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
494 update_progress_stats();
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
501 my $id = $jobstep_todo[$todo_ptr];
502 my $Jobstep = $jobstep[$id];
503 if ($Jobstep->{level} != $level)
507 if ($Jobstep->{attempts} > 2)
509 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
514 pipe $reader{$id}, "writer" or croak ($!);
515 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
518 my $childslot = $freeslot[0];
519 my $childnode = $slot[$childslot]->{node};
520 my $childslotname = join (".",
521 $slot[$childslot]->{node}->{name},
522 $slot[$childslot]->{cpu});
523 my $childpid = fork();
526 $SIG{'INT'} = 'DEFAULT';
527 $SIG{'QUIT'} = 'DEFAULT';
528 $SIG{'TERM'} = 'DEFAULT';
530 foreach (values (%reader))
534 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535 open(STDOUT,">&writer");
536 open(STDERR,">&writer");
541 delete $ENV{"GNUPGHOME"};
542 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543 $ENV{"TASK_QSEQUENCE"} = $id;
544 $ENV{"TASK_SEQUENCE"} = $level;
545 $ENV{"JOB_SCRIPT"} = $Job->{script};
546 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547 $param =~ tr/a-z/A-Z/;
548 $ENV{"JOB_PARAMETER_$param"} = $value;
550 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
560 "--nodelist=".$childnode->{name},
561 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562 "--job-name=$job_id.$id.$$",
564 my @execargs = qw(sh);
565 my $build_script_to_send = "";
567 "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568 ."&& cd $ENV{CRUNCH_TMP} ";
571 $build_script_to_send = $build_script;
575 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
576 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
577 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
579 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580 my @execargs = ('bash', '-c', $command);
581 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
585 if (!defined $childpid)
592 $proc{$childpid} = { jobstep => $id,
595 jobstepname => "$job_id.$id.$childpid",
597 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598 $slot[$childslot]->{pid} = $childpid;
600 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601 Log ($id, "child $childpid started on $childslotname");
602 $Jobstep->{attempts} ++;
603 $Jobstep->{starttime} = time;
604 $Jobstep->{node} = $childnode->{name};
605 $Jobstep->{slotindex} = $childslot;
606 delete $Jobstep->{stderr};
607 delete $Jobstep->{finishtime};
609 splice @jobstep_todo, $todo_ptr, 1;
612 $progress_is_dirty = 1;
616 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
618 last THISROUND if $main::please_freeze;
619 if ($main::please_info)
621 $main::please_info = 0;
625 update_progress_stats();
633 update_progress_stats();
634 select (undef, undef, undef, 0.1);
636 elsif (time - $progress_stats_updated >= 30)
638 update_progress_stats();
640 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
643 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644 .($thisround_failed+$thisround_succeeded)
645 .") -- giving up on this round";
646 Log (undef, $message);
650 # move slots from freeslot to holdslot (or back to freeslot) if necessary
651 for (my $i=$#freeslot; $i>=0; $i--) {
652 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653 push @holdslot, (splice @freeslot, $i, 1);
656 for (my $i=$#holdslot; $i>=0; $i--) {
657 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658 push @freeslot, (splice @holdslot, $i, 1);
662 # give up if no nodes are succeeding
663 if (!grep { $_->{node}->{losing_streak} == 0 &&
664 $_->{node}->{hold_count} < 4 } @slot) {
665 my $message = "Every node has failed -- giving up on this round";
666 Log (undef, $message);
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 if ($main::please_continue) {
681 $main::please_continue = 0;
684 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
689 update_progress_stats();
690 select (undef, undef, undef, 0.1);
691 killem (keys %proc) if $main::please_freeze;
695 update_progress_stats();
696 freeze_if_want_freeze();
699 if (!defined $success)
702 $thisround_succeeded == 0 &&
703 ($thisround_failed == 0 || $thisround_failed > 4))
705 my $message = "stop because $thisround_failed tasks failed and none succeeded";
706 Log (undef, $message);
715 goto ONELEVEL if !defined $success;
718 release_allocation();
721 $Job->{'output'} = &collate_output();
722 $Job->{'running'} = 0;
723 $Job->{'success'} = $Job->{'output'} && $success;
724 $Job->{'finished_at'} = gmtime;
725 $Job->save if $job_has_uuid;
727 if ($Job->{'output'})
730 my $manifest_text = capturex("whget", $Job->{'output'});
731 $arv->{'collections'}->{'create'}->execute('collection' => {
732 'uuid' => $Job->{'output'},
733 'manifest_text' => $manifest_text,
737 Log (undef, "Failed to register output manifest: $@");
741 Log (undef, "finish");
748 sub update_progress_stats
750 $progress_stats_updated = time;
751 return if !$progress_is_dirty;
752 my ($todo, $done, $running) = (scalar @jobstep_todo,
753 scalar @jobstep_done,
754 scalar @slot - scalar @freeslot - scalar @holdslot);
755 $Job->{'tasks_summary'} ||= {};
756 $Job->{'tasks_summary'}->{'todo'} = $todo;
757 $Job->{'tasks_summary'}->{'done'} = $done;
758 $Job->{'tasks_summary'}->{'running'} = $running;
759 $Job->save if $job_has_uuid;
760 Log (undef, "status: $done done, $running running, $todo todo");
761 $progress_is_dirty = 0;
768 my $pid = waitpid (-1, WNOHANG);
769 return 0 if $pid <= 0;
771 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
773 . $slot[$proc{$pid}->{slot}]->{cpu});
774 my $jobstepid = $proc{$pid}->{jobstep};
775 my $elapsed = time - $proc{$pid}->{time};
776 my $Jobstep = $jobstep[$jobstepid];
779 my $exitinfo = "exit $exitcode";
780 $Jobstep->{'arvados_task'}->reload;
781 my $success = $Jobstep->{'arvados_task'}->{success};
783 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
785 if (!defined $success) {
786 # task did not indicate one way or the other --> fail
787 $Jobstep->{'arvados_task'}->{success} = 0;
788 $Jobstep->{'arvados_task'}->save;
794 my $no_incr_attempts;
795 $no_incr_attempts = 1 if $Jobstep->{node_fail};
798 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
800 # Check for signs of a failed or misconfigured node
801 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
802 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
803 # Don't count this against jobstep failure thresholds if this
804 # node is already suspected faulty and srun exited quickly
805 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
807 $Jobstep->{attempts} > 1) {
808 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
809 $no_incr_attempts = 1;
811 ban_node_by_slot($proc{$pid}->{slot});
814 push @jobstep_todo, $jobstepid;
815 Log ($jobstepid, "failure in $elapsed seconds");
817 --$Jobstep->{attempts} if $no_incr_attempts;
818 $Job->{'tasks_summary'}->{'failed'}++;
822 ++$thisround_succeeded;
823 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
824 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
825 push @jobstep_done, $jobstepid;
826 Log ($jobstepid, "success in $elapsed seconds");
828 $Jobstep->{exitcode} = $exitcode;
829 $Jobstep->{finishtime} = time;
830 process_stderr ($jobstepid, $success);
831 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
833 close $reader{$jobstepid};
834 delete $reader{$jobstepid};
835 delete $slot[$proc{$pid}->{slot}]->{pid};
836 push @freeslot, $proc{$pid}->{slot};
840 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
842 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
844 'order' => 'qsequence'
846 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
848 'level' => $arvados_task->{'sequence'},
850 'arvados_task' => $arvados_task
852 push @jobstep, $jobstep;
853 push @jobstep_todo, $#jobstep;
856 $progress_is_dirty = 1;
863 # return if the kill list was checked <4 seconds ago
864 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
868 $squeue_kill_checked = time;
870 # use killem() on procs whose killtime is reached
873 if (exists $proc{$_}->{killtime}
874 && $proc{$_}->{killtime} <= time)
880 # return if the squeue was checked <60 seconds ago
881 if (defined $squeue_checked && $squeue_checked > time - 60)
885 $squeue_checked = time;
889 # here is an opportunity to check for mysterious problems with local procs
893 # get a list of steps still running
894 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
896 if ($squeue[-1] ne "ok")
902 # which of my jobsteps are running, according to squeue?
906 if (/^(\d+)\.(\d+) (\S+)/)
908 if ($1 eq $ENV{SLURM_JOBID})
915 # which of my active child procs (>60s old) were not mentioned by squeue?
918 if ($proc{$_}->{time} < time - 60
919 && !exists $ok{$proc{$_}->{jobstepname}}
920 && !exists $proc{$_}->{killtime})
922 # kill this proc if it hasn't exited in 30 seconds
923 $proc{$_}->{killtime} = time + 30;
929 sub release_allocation
933 Log (undef, "release job allocation");
934 system "scancel $ENV{SLURM_JOBID}";
942 foreach my $job (keys %reader)
945 while (0 < sysread ($reader{$job}, $buf, 8192))
947 print STDERR $buf if $ENV{CRUNCH_DEBUG};
948 $jobstep[$job]->{stderr} .= $buf;
949 preprocess_stderr ($job);
950 if (length ($jobstep[$job]->{stderr}) > 16384)
952 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
961 sub preprocess_stderr
965 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
967 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
968 Log ($job, "stderr $line");
969 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
971 $main::please_freeze = 1;
973 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
974 $jobstep[$job]->{node_fail} = 1;
975 ban_node_by_slot($jobstep[$job]->{slotindex});
985 preprocess_stderr ($job);
988 Log ($job, "stderr $_");
989 } split ("\n", $jobstep[$job]->{stderr});
995 my $whc = Warehouse->new;
996 Log (undef, "collate");
997 $whc->write_start (1);
1001 next if (!exists $_->{'arvados_task'}->{output} ||
1002 !$_->{'arvados_task'}->{'success'} ||
1003 $_->{'exitcode'} != 0);
1004 my $output = $_->{'arvados_task'}->{output};
1005 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1007 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1008 $whc->write_data ($output);
1010 elsif (@jobstep == 1)
1012 $joboutput = $output;
1015 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1017 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1018 $whc->write_data ($outblock);
1022 my $errstr = $whc->errstr;
1023 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1027 $joboutput = $whc->write_finish if !defined $joboutput;
1030 Log (undef, "output $joboutput");
1031 $Job->{'output'} = $joboutput;
1032 $Job->save if $job_has_uuid;
1036 Log (undef, "output undef");
1046 my $sig = 2; # SIGINT first
1047 if (exists $proc{$_}->{"sent_$sig"} &&
1048 time - $proc{$_}->{"sent_$sig"} > 4)
1050 $sig = 15; # SIGTERM if SIGINT doesn't work
1052 if (exists $proc{$_}->{"sent_$sig"} &&
1053 time - $proc{$_}->{"sent_$sig"} > 4)
1055 $sig = 9; # SIGKILL if SIGTERM doesn't work
1057 if (!exists $proc{$_}->{"sent_$sig"})
1059 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1061 select (undef, undef, undef, 0.1);
1064 kill $sig, $_; # srun wants two SIGINT to really interrupt
1066 $proc{$_}->{"sent_$sig"} = time;
1067 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1077 vec($bits,fileno($_),1) = 1;
1083 sub Log # ($jobstep_id, $logmessage)
1085 if ($_[1] =~ /\n/) {
1086 for my $line (split (/\n/, $_[1])) {
1091 my $fh = select STDERR; $|=1; select $fh;
1092 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1093 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1096 if ($metastream || -t STDERR) {
1097 my @gmtime = gmtime;
1098 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1099 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1101 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1103 return if !$metastream;
1104 $metastream->write_data ($datetime . " " . $message);
1110 my ($package, $file, $line) = caller;
1111 my $message = "@_ at $file line $line\n";
1112 Log (undef, $message);
1113 freeze() if @jobstep_todo;
1114 collate_output() if @jobstep_todo;
1116 save_meta() if $metastream;
1123 return if !$job_has_uuid;
1125 $Job->{'running'} = 0;
1126 $Job->{'success'} = 0;
1127 $Job->{'finished_at'} = gmtime;
1134 my $justcheckpoint = shift; # false if this will be the last meta saved
1135 my $m = $metastream;
1136 $m = $m->copy if $justcheckpoint;
1138 my $loglocator = $m->as_key;
1139 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1140 Log (undef, "meta key is $loglocator");
1141 $Job->{'log'} = $loglocator;
1142 $Job->save if $job_has_uuid;
1146 sub freeze_if_want_freeze
1148 if ($main::please_freeze)
1150 release_allocation();
1153 # kill some srun procs before freeze+stop
1154 map { $proc{$_} = {} } @_;
1157 killem (keys %proc);
1158 select (undef, undef, undef, 0.1);
1160 while (($died = waitpid (-1, WNOHANG)) > 0)
1162 delete $proc{$died};
1177 Log (undef, "Freeze not implemented");
1184 croak ("Thaw not implemented");
1188 Log (undef, "thaw from $key");
1193 @jobstep_tomerge = ();
1194 $jobstep_tomerge_level = 0;
1197 my $stream = new Warehouse::Stream ( whc => $whc,
1198 hash => [split (",", $key)] );
1200 while (my $dataref = $stream->read_until (undef, "\n\n"))
1202 if ($$dataref =~ /^job /)
1204 foreach (split ("\n", $$dataref))
1206 my ($k, $v) = split ("=", $_, 2);
1207 $frozenjob->{$k} = freezeunquote ($v);
1212 if ($$dataref =~ /^merge (\d+) (.*)/)
1214 $jobstep_tomerge_level = $1;
1216 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1221 foreach (split ("\n", $$dataref))
1223 my ($k, $v) = split ("=", $_, 2);
1224 $Jobstep->{$k} = freezeunquote ($v) if $k;
1226 $Jobstep->{attempts} = 0;
1227 push @jobstep, $Jobstep;
1229 if ($Jobstep->{exitcode} eq "0")
1231 push @jobstep_done, $#jobstep;
1235 push @jobstep_todo, $#jobstep;
1239 foreach (qw (script script_version script_parameters))
1241 $Job->{$_} = $frozenjob->{$_};
1243 $Job->save if $job_has_uuid;
1259 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1266 my $srunargs = shift;
1267 my $execargs = shift;
1268 my $opts = shift || {};
1270 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1271 print STDERR (join (" ",
1272 map { / / ? "'$_'" : $_ }
1275 if $ENV{CRUNCH_DEBUG};
1277 if (defined $stdin) {
1278 my $child = open STDIN, "-|";
1279 defined $child or die "no fork: $!";
1281 print $stdin or die $!;
1282 close STDOUT or die $!;
1287 return system (@$args) if $opts->{fork};
1290 warn "ENV size is ".length(join(" ",%ENV));
1291 die "exec failed: $!: @$args";
1295 sub ban_node_by_slot {
1296 # Don't start any new jobsteps on this node for 60 seconds
1298 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1299 $slot[$slotid]->{node}->{hold_count}++;
1300 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1306 # checkout-and-build
1310 my $destdir = $ENV{"CRUNCH_SRC"};
1311 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1312 my $repo = $ENV{"CRUNCH_SRC_URL"};
1314 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1316 if (readlink ("$destdir.commit") eq $commit) {
1320 unlink "$destdir.commit";
1321 open STDOUT, ">", "$destdir.log";
1322 open STDERR, ">&STDOUT";
1325 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1328 die "'tar -C $destdir -xf -' exited $?: $!";
1332 chomp ($pwd = `pwd`);
1333 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1335 if (-e "$destdir/crunch_scripts/install") {
1336 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1337 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1339 shell_or_die ("./tests/autotests.sh", $install_dir);
1340 } elsif (-e "./install.sh") {
1341 shell_or_die ("./install.sh", $install_dir);
1345 unlink "$destdir.commit.new";
1346 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1347 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1356 if ($ENV{"DEBUG"}) {
1357 print STDERR "@_\n";
1360 or die "@_ failed: $! exit 0x".sprintf("%x",$?);