2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 $main::ENV{CRUNCH_DEBUG} = 1;
112 $main::ENV{CRUNCH_DEBUG} = 0;
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->write_start('log.txt');
129 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130 $User = $arv->{'users'}->{'current'}->execute;
131 if (!$force_unlock) {
132 if ($Job->{'is_locked_by'}) {
133 croak("Job is locked: " . $Job->{'is_locked_by'});
135 if ($Job->{'success'} ne undef) {
136 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138 if ($Job->{'running'}) {
139 croak("Job 'running' flag is already set");
141 if ($Job->{'started_at'}) {
142 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148 $Job = JSON::decode_json($jobspec);
152 map { croak ("No $_ specified") unless $Job->{$_} }
153 qw(script script_version script_parameters);
156 if (!defined $Job->{'uuid'}) {
157 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
160 $job_id = $Job->{'uuid'};
164 $Job->{'resource_limits'} ||= {};
165 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
166 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
169 Log (undef, "check slurm allocation");
172 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
176 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
177 push @sinfo, "$localcpus localhost";
179 if (exists $ENV{SLURM_NODELIST})
181 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
185 my ($ncpus, $slurm_nodelist) = split;
186 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
189 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
192 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
195 foreach (split (",", $ranges))
208 push @nodelist, map {
210 $n =~ s/\[[-,\d]+\]/$_/;
217 push @nodelist, $nodelist;
220 foreach my $nodename (@nodelist)
222 Log (undef, "node $nodename - $ncpus slots");
223 my $node = { name => $nodename,
227 foreach my $cpu (1..$ncpus)
229 push @slot, { node => $node,
233 push @node, @nodelist;
238 # Ensure that we get one jobstep running on each allocated node before
239 # we start overloading nodes with concurrent steps
241 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
248 # Claim this job, and make sure nobody else does
250 $Job->{'is_locked_by'} = $User->{'uuid'};
251 $Job->{'started_at'} = gmtime;
252 $Job->{'running'} = 1;
253 $Job->{'success'} = undef;
254 $Job->{'tasks_summary'} = { 'failed' => 0,
258 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
259 croak("Error while updating / locking job");
264 Log (undef, "start");
265 $SIG{'INT'} = sub { $main::please_freeze = 1; };
266 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
267 $SIG{'TERM'} = \&croak;
268 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
269 $SIG{'ALRM'} = sub { $main::please_info = 1; };
270 $SIG{'CONT'} = sub { $main::please_continue = 1; };
271 $main::please_freeze = 0;
272 $main::please_info = 0;
273 $main::please_continue = 0;
274 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
276 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
277 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
278 $ENV{"JOB_UUID"} = $job_id;
282 my @jobstep_todo = ();
283 my @jobstep_done = ();
284 my @jobstep_tomerge = ();
285 my $jobstep_tomerge_level = 0;
287 my $squeue_kill_checked;
288 my $output_in_keep = 0;
292 if (defined $Job->{thawedfromkey})
294 thaw ($Job->{thawedfromkey});
298 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
299 'job_uuid' => $Job->{'uuid'},
304 push @jobstep, { 'level' => 0,
306 'arvados_task' => $first_task,
308 push @jobstep_todo, 0;
315 $build_script = <DATA>;
319 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
321 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
324 $ENV{"CRUNCH_SRC"} = $Job->{revision};
328 Log (undef, "Install revision ".$Job->{revision});
329 my $nodelist = join(",", @node);
331 # Clean out crunch_tmp/work and crunch_tmp/opt
333 my $cleanpid = fork();
336 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
337 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
342 last if $cleanpid == waitpid (-1, WNOHANG);
343 freeze_if_want_freeze ($cleanpid);
344 select (undef, undef, undef, 0.1);
346 Log (undef, "Clean-work-dir exited $?");
348 # Install requested code version
351 my @srunargs = ("srun",
352 "--nodelist=$nodelist",
353 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
355 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
356 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
357 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
361 my $treeish = $Job->{'script_version'};
362 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
363 # Todo: let script_version specify repository instead of expecting
364 # parent process to figure it out.
365 $ENV{"CRUNCH_SRC_URL"} = $repo;
367 # Create/update our clone of the remote git repo
369 if (!-d $ENV{"CRUNCH_SRC"}) {
370 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
371 or croak ("git clone $repo failed: exit ".($?>>8));
372 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
374 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
376 # If this looks like a subversion r#, look for it in git-svn commit messages
378 if ($treeish =~ m{^\d{1,4}$}) {
379 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
381 if ($gitlog =~ /^[a-f0-9]{40}$/) {
383 Log (undef, "Using commit $commit for revision $treeish");
387 # If that didn't work, try asking git to look it up as a tree-ish.
389 if (!defined $commit) {
391 my $cooked_treeish = $treeish;
392 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
393 # Looks like a git branch name -- make sure git knows it's
394 # relative to the remote repo
395 $cooked_treeish = "origin/$treeish";
398 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
400 if ($found =~ /^[0-9a-f]{40}$/s) {
402 if ($commit ne $treeish) {
403 # Make sure we record the real commit id in the database,
404 # frozentokey, logs, etc. -- instead of an abbreviation or a
405 # branch name which can become ambiguous or point to a
406 # different commit in the future.
407 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
408 Log (undef, "Using commit $commit for tree-ish $treeish");
409 if ($commit ne $treeish) {
410 $Job->{'script_version'} = $commit;
411 $Job->save() or croak("Error while updating job");
417 if (defined $commit) {
418 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419 @execargs = ("sh", "-c",
420 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
421 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
424 croak ("could not figure out commit id for $treeish");
427 my $installpid = fork();
428 if ($installpid == 0)
430 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
435 last if $installpid == waitpid (-1, WNOHANG);
436 freeze_if_want_freeze ($installpid);
437 select (undef, undef, undef, 0.1);
439 Log (undef, "Install exited $?");
444 foreach (qw (script script_version script_parameters resource_limits))
448 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
450 foreach (split (/\n/, $Job->{knobs}))
452 Log (undef, "knob " . $_);
463 my $thisround_succeeded = 0;
464 my $thisround_failed = 0;
465 my $thisround_failed_multiple = 0;
467 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
468 or $a <=> $b } @jobstep_todo;
469 my $level = $jobstep[$jobstep_todo[0]]->{level};
470 Log (undef, "start level $level");
475 my @freeslot = (0..$#slot);
478 my $progress_is_dirty = 1;
479 my $progress_stats_updated = 0;
481 update_progress_stats();
486 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
488 $main::please_continue = 0;
490 my $id = $jobstep_todo[$todo_ptr];
491 my $Jobstep = $jobstep[$id];
492 if ($Jobstep->{level} != $level)
496 if ($Jobstep->{attempts} > 9)
498 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
503 pipe $reader{$id}, "writer" or croak ($!);
504 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
505 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
507 my $childslot = $freeslot[0];
508 my $childnode = $slot[$childslot]->{node};
509 my $childslotname = join (".",
510 $slot[$childslot]->{node}->{name},
511 $slot[$childslot]->{cpu});
512 my $childpid = fork();
515 $SIG{'INT'} = 'DEFAULT';
516 $SIG{'QUIT'} = 'DEFAULT';
517 $SIG{'TERM'} = 'DEFAULT';
519 foreach (values (%reader))
523 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
524 open(STDOUT,">&writer");
525 open(STDERR,">&writer");
530 delete $ENV{"GNUPGHOME"};
531 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
532 $ENV{"TASK_QSEQUENCE"} = $id;
533 $ENV{"TASK_SEQUENCE"} = $level;
534 $ENV{"JOB_SCRIPT"} = $Job->{script};
535 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
536 $param =~ tr/a-z/A-Z/;
537 $ENV{"JOB_PARAMETER_$param"} = $value;
539 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
540 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
541 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
542 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
548 "--nodelist=".$childnode->{name},
549 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
550 "--job-name=$job_id.$id.$$",
552 my @execargs = qw(sh);
553 my $build_script_to_send = "";
555 "mkdir -p $ENV{CRUNCH_TMP}/revision "
556 ."&& cd $ENV{CRUNCH_TMP} ";
559 $build_script_to_send = $build_script;
563 elsif (!$skip_install)
568 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
570 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
571 ." && ./installrevision "
575 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
577 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
578 my @execargs = ('bash', '-c', $command);
579 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
583 if (!defined $childpid)
590 $proc{$childpid} = { jobstep => $id,
593 jobstepname => "$job_id.$id.$childpid",
595 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
596 $slot[$childslot]->{pid} = $childpid;
598 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
599 Log ($id, "child $childpid started on $childslotname");
600 $Jobstep->{attempts} ++;
601 $Jobstep->{starttime} = time;
602 $Jobstep->{node} = $childnode->{name};
603 $Jobstep->{slotindex} = $childslot;
604 delete $Jobstep->{stderr};
605 delete $Jobstep->{finishtime};
607 splice @jobstep_todo, $todo_ptr, 1;
610 $progress_is_dirty = 1;
614 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
616 last THISROUND if $main::please_freeze;
617 if ($main::please_info)
619 $main::please_info = 0;
623 update_progress_stats();
631 update_progress_stats();
632 select (undef, undef, undef, 0.1);
634 elsif (time - $progress_stats_updated >= 30)
636 update_progress_stats();
638 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
639 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
641 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
642 .($thisround_failed+$thisround_succeeded)
643 .") -- giving up on this round";
644 Log (undef, $message);
648 # move slots from freeslot to holdslot (or back to freeslot) if necessary
649 for (my $i=$#freeslot; $i>=0; $i--) {
650 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
651 push @holdslot, (splice @freeslot, $i, 1);
654 for (my $i=$#holdslot; $i>=0; $i--) {
655 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
656 push @freeslot, (splice @holdslot, $i, 1);
660 # give up if no nodes are succeeding
661 if (!grep { $_->{node}->{losing_streak} == 0 &&
662 $_->{node}->{hold_count} < 4 } @slot) {
663 my $message = "Every node has failed -- giving up on this round";
664 Log (undef, $message);
671 push @freeslot, splice @holdslot;
672 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
675 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
678 goto THISROUND if $main::please_continue;
679 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
684 update_progress_stats();
685 select (undef, undef, undef, 0.1);
686 killem (keys %proc) if $main::please_freeze;
690 update_progress_stats();
691 freeze_if_want_freeze();
694 if (!defined $success)
697 $thisround_succeeded == 0 &&
698 ($thisround_failed == 0 || $thisround_failed > 4))
700 my $message = "stop because $thisround_failed tasks failed and none succeeded";
701 Log (undef, $message);
710 goto ONELEVEL if !defined $success;
713 release_allocation();
715 $Job->{'output'} = &collate_output();
716 $Job->{'success'} = $Job->{'output'} && $success;
719 if ($Job->{'output'})
722 my $manifest_text = capturex("whget", $Job->{'output'});
723 $arv->{'collections'}->{'create'}->execute('collection' => {
724 'uuid' => $Job->{'output'},
725 'manifest_text' => $manifest_text,
729 Log (undef, "Failed to register output manifest: $@");
733 Log (undef, "finish");
740 sub update_progress_stats
742 $progress_stats_updated = time;
743 return if !$progress_is_dirty;
744 my ($todo, $done, $running) = (scalar @jobstep_todo,
745 scalar @jobstep_done,
746 scalar @slot - scalar @freeslot - scalar @holdslot);
747 $Job->{'tasks_summary'} ||= {};
748 $Job->{'tasks_summary'}->{'todo'} = $todo;
749 $Job->{'tasks_summary'}->{'done'} = $done;
750 $Job->{'tasks_summary'}->{'running'} = $running;
752 Log (undef, "status: $done done, $running running, $todo todo");
753 $progress_is_dirty = 0;
760 my $pid = waitpid (-1, WNOHANG);
761 return 0 if $pid <= 0;
763 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
765 . $slot[$proc{$pid}->{slot}]->{cpu});
766 my $jobstepid = $proc{$pid}->{jobstep};
767 my $elapsed = time - $proc{$pid}->{time};
768 my $Jobstep = $jobstep[$jobstepid];
771 my $exitinfo = "exit $exitcode";
772 $Jobstep->{'arvados_task'}->reload;
773 my $success = $Jobstep->{'arvados_task'}->{success};
775 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
777 if (!defined $success) {
778 # task did not indicate one way or the other --> fail
779 $Jobstep->{'arvados_task'}->{success} = 0;
780 $Jobstep->{'arvados_task'}->save;
786 my $no_incr_attempts;
787 $no_incr_attempts = 1 if $Jobstep->{node_fail};
790 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
792 # Check for signs of a failed or misconfigured node
793 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
794 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
795 # Don't count this against jobstep failure thresholds if this
796 # node is already suspected faulty and srun exited quickly
797 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
799 $Jobstep->{attempts} > 1) {
800 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
801 $no_incr_attempts = 1;
802 --$Jobstep->{attempts};
804 ban_node_by_slot($proc{$pid}->{slot});
807 push @jobstep_todo, $jobstepid;
808 Log ($jobstepid, "failure in $elapsed seconds");
810 --$Jobstep->{attempts} if $no_incr_attempts;
811 $Job->{'tasks_summary'}->{'failed'}++;
815 ++$thisround_succeeded;
816 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
817 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
818 push @jobstep_done, $jobstepid;
819 Log ($jobstepid, "success in $elapsed seconds");
821 $Jobstep->{exitcode} = $exitcode;
822 $Jobstep->{finishtime} = time;
823 process_stderr ($jobstepid, $success);
824 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
826 close $reader{$jobstepid};
827 delete $reader{$jobstepid};
828 delete $slot[$proc{$pid}->{slot}]->{pid};
829 push @freeslot, $proc{$pid}->{slot};
833 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
835 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
837 'order' => 'qsequence'
839 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
841 'level' => $arvados_task->{'sequence'},
843 'arvados_task' => $arvados_task
845 push @jobstep, $jobstep;
846 push @jobstep_todo, $#jobstep;
849 $progress_is_dirty = 1;
856 # return if the kill list was checked <4 seconds ago
857 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
861 $squeue_kill_checked = time;
863 # use killem() on procs whose killtime is reached
866 if (exists $proc{$_}->{killtime}
867 && $proc{$_}->{killtime} <= time)
873 # return if the squeue was checked <60 seconds ago
874 if (defined $squeue_checked && $squeue_checked > time - 60)
878 $squeue_checked = time;
882 # here is an opportunity to check for mysterious problems with local procs
886 # get a list of steps still running
887 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
889 if ($squeue[-1] ne "ok")
895 # which of my jobsteps are running, according to squeue?
899 if (/^(\d+)\.(\d+) (\S+)/)
901 if ($1 eq $ENV{SLURM_JOBID})
908 # which of my active child procs (>60s old) were not mentioned by squeue?
911 if ($proc{$_}->{time} < time - 60
912 && !exists $ok{$proc{$_}->{jobstepname}}
913 && !exists $proc{$_}->{killtime})
915 # kill this proc if it hasn't exited in 30 seconds
916 $proc{$_}->{killtime} = time + 30;
922 sub release_allocation
926 Log (undef, "release job allocation");
927 system "scancel $ENV{SLURM_JOBID}";
935 foreach my $job (keys %reader)
938 while (0 < sysread ($reader{$job}, $buf, 8192))
940 print STDERR $buf if $ENV{CRUNCH_DEBUG};
941 $jobstep[$job]->{stderr} .= $buf;
942 preprocess_stderr ($job);
943 if (length ($jobstep[$job]->{stderr}) > 16384)
945 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
954 sub preprocess_stderr
958 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
960 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
961 Log ($job, "stderr $line");
962 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
964 $main::please_freeze = 1;
966 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
967 $jobstep[$job]->{node_fail} = 1;
968 ban_node_by_slot($jobstep[$job]->{slotindex});
978 preprocess_stderr ($job);
981 Log ($job, "stderr $_");
982 } split ("\n", $jobstep[$job]->{stderr});
988 my $whc = Warehouse->new;
989 Log (undef, "collate");
990 $whc->write_start (1);
994 next if (!exists $_->{'arvados_task'}->{output} ||
995 !$_->{'arvados_task'}->{'success'} ||
996 $_->{'exitcode'} != 0);
997 my $output = $_->{'arvados_task'}->{output};
998 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1000 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1001 $whc->write_data ($output);
1003 elsif (@jobstep == 1)
1005 $joboutput = $output;
1008 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1010 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1011 $whc->write_data ($outblock);
1015 my $errstr = $whc->errstr;
1016 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1020 $joboutput = $whc->write_finish if !defined $joboutput;
1023 Log (undef, "output $joboutput");
1024 $Job->{'output'} = $joboutput;
1029 Log (undef, "output undef");
1039 my $sig = 2; # SIGINT first
1040 if (exists $proc{$_}->{"sent_$sig"} &&
1041 time - $proc{$_}->{"sent_$sig"} > 4)
1043 $sig = 15; # SIGTERM if SIGINT doesn't work
1045 if (exists $proc{$_}->{"sent_$sig"} &&
1046 time - $proc{$_}->{"sent_$sig"} > 4)
1048 $sig = 9; # SIGKILL if SIGTERM doesn't work
1050 if (!exists $proc{$_}->{"sent_$sig"})
1052 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1054 select (undef, undef, undef, 0.1);
1057 kill $sig, $_; # srun wants two SIGINT to really interrupt
1059 $proc{$_}->{"sent_$sig"} = time;
1060 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1070 vec($bits,fileno($_),1) = 1;
1076 sub Log # ($jobstep_id, $logmessage)
1078 if ($_[1] =~ /\n/) {
1079 for my $line (split (/\n/, $_[1])) {
1084 my $fh = select STDERR; $|=1; select $fh;
1085 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1086 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1089 if ($metastream || -t STDERR) {
1090 my @gmtime = gmtime;
1091 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1092 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1094 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1096 return if !$metastream;
1097 $metastream->write_data ($datetime . " " . $message);
1103 my ($package, $file, $line) = caller;
1104 my $message = "@_ at $file line $line\n";
1105 Log (undef, $message);
1106 freeze() if @jobstep_todo;
1107 collate_output() if @jobstep_todo;
1109 save_meta() if $metastream;
1116 return if !$job_has_uuid;
1118 $Job->{'running'} = 0;
1119 $Job->{'success'} = 0;
1120 $Job->{'finished_at'} = gmtime;
1127 my $justcheckpoint = shift; # false if this will be the last meta saved
1128 my $m = $metastream;
1129 $m = $m->copy if $justcheckpoint;
1131 my $loglocator = $m->as_key;
1132 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1133 Log (undef, "meta key is $loglocator");
1134 $Job->{'log'} = $loglocator;
1139 sub freeze_if_want_freeze
1141 if ($main::please_freeze)
1143 release_allocation();
1146 # kill some srun procs before freeze+stop
1147 map { $proc{$_} = {} } @_;
1150 killem (keys %proc);
1151 select (undef, undef, undef, 0.1);
1153 while (($died = waitpid (-1, WNOHANG)) > 0)
1155 delete $proc{$died};
1170 Log (undef, "Freeze not implemented");
1177 croak ("Thaw not implemented");
1181 Log (undef, "thaw from $key");
1186 @jobstep_tomerge = ();
1187 $jobstep_tomerge_level = 0;
1190 my $stream = new Warehouse::Stream ( whc => $whc,
1191 hash => [split (",", $key)] );
1193 while (my $dataref = $stream->read_until (undef, "\n\n"))
1195 if ($$dataref =~ /^job /)
1197 foreach (split ("\n", $$dataref))
1199 my ($k, $v) = split ("=", $_, 2);
1200 $frozenjob->{$k} = freezeunquote ($v);
1205 if ($$dataref =~ /^merge (\d+) (.*)/)
1207 $jobstep_tomerge_level = $1;
1209 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1214 foreach (split ("\n", $$dataref))
1216 my ($k, $v) = split ("=", $_, 2);
1217 $Jobstep->{$k} = freezeunquote ($v) if $k;
1219 $Jobstep->{attempts} = 0;
1220 push @jobstep, $Jobstep;
1222 if ($Jobstep->{exitcode} eq "0")
1224 push @jobstep_done, $#jobstep;
1228 push @jobstep_todo, $#jobstep;
1232 foreach (qw (script script_version script_parameters))
1234 $Job->{$_} = $frozenjob->{$_};
1252 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1259 my $srunargs = shift;
1260 my $execargs = shift;
1261 my $opts = shift || {};
1263 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1264 print STDERR (join (" ",
1265 map { / / ? "'$_'" : $_ }
1268 if $ENV{CRUNCH_DEBUG};
1270 if (defined $stdin) {
1271 my $child = open STDIN, "-|";
1272 defined $child or die "no fork: $!";
1274 print $stdin or die $!;
1275 close STDOUT or die $!;
1280 return system (@$args) if $opts->{fork};
1283 warn "ENV size is ".length(join(" ",%ENV));
1284 die "exec failed: $!: @$args";
1288 sub ban_node_by_slot {
1289 # Don't start any new jobsteps on this node for 60 seconds
1291 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1292 $slot[$slotid]->{node}->{hold_count}++;
1293 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1299 # checkout-and-build
1303 my $destdir = $ENV{"CRUNCH_SRC"};
1304 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1305 my $repo = $ENV{"CRUNCH_SRC_URL"};
1307 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1309 if (readlink ("$destdir.commit") eq $commit) {
1313 unlink "$destdir.commit";
1314 open STDOUT, ">", "$destdir.log";
1315 open STDERR, ">&STDOUT";
1318 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1321 die "'tar -C $destdir -xf -' exited $?: $!";
1325 chomp ($pwd = `pwd`);
1326 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1328 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1330 shell_or_die ("./tests/autotests.sh", $install_dir);
1331 } elsif (-e "./install.sh") {
1332 shell_or_die ("./install.sh", $install_dir);
1336 unlink "$destdir.commit.new";
1337 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1338 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1347 if ($ENV{"DEBUG"}) {
1348 print STDERR "@_\n";
1351 or die "@_ failed: $! exit 0x".sprintf("%x",$?);