2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 $main::ENV{CRUNCH_DEBUG} = 1;
112 $main::ENV{CRUNCH_DEBUG} = 0;
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->write_start('log.txt');
129 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130 $User = $arv->{'users'}->{'current'}->execute;
131 if (!$force_unlock) {
132 if ($Job->{'is_locked_by'}) {
133 croak("Job is locked: " . $Job->{'is_locked_by'});
135 if ($Job->{'success'} ne undef) {
136 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138 if ($Job->{'running'}) {
139 croak("Job 'running' flag is already set");
141 if ($Job->{'started_at'}) {
142 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148 $Job = JSON::decode_json($jobspec);
152 map { croak ("No $_ specified") unless $Job->{$_} }
153 qw(script script_version script_parameters);
156 if (!defined $Job->{'uuid'}) {
157 my $hostname = `hostname -s`;
159 $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
162 $job_id = $Job->{'uuid'};
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
171 Log (undef, "check slurm allocation");
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
178 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179 push @sinfo, "$localcpus localhost";
181 if (exists $ENV{SLURM_NODELIST})
183 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
187 my ($ncpus, $slurm_nodelist) = split;
188 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
191 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
194 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
197 foreach (split (",", $ranges))
210 push @nodelist, map {
212 $n =~ s/\[[-,\d]+\]/$_/;
219 push @nodelist, $nodelist;
222 foreach my $nodename (@nodelist)
224 Log (undef, "node $nodename - $ncpus slots");
225 my $node = { name => $nodename,
229 foreach my $cpu (1..$ncpus)
231 push @slot, { node => $node,
235 push @node, @nodelist;
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250 # Claim this job, and make sure nobody else does
252 $Job->{'is_locked_by'} = $User->{'uuid'};
253 $Job->{'started_at'} = gmtime;
254 $Job->{'running'} = 1;
255 $Job->{'success'} = undef;
256 $Job->{'tasks_summary'} = { 'failed' => 0,
261 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
262 croak("Error while updating / locking job");
268 Log (undef, "start");
269 $SIG{'INT'} = sub { $main::please_freeze = 1; };
270 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
271 $SIG{'TERM'} = \&croak;
272 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
273 $SIG{'ALRM'} = sub { $main::please_info = 1; };
274 $SIG{'CONT'} = sub { $main::please_continue = 1; };
275 $main::please_freeze = 0;
276 $main::please_info = 0;
277 $main::please_continue = 0;
278 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
280 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
281 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
282 $ENV{"JOB_UUID"} = $job_id;
286 my @jobstep_todo = ();
287 my @jobstep_done = ();
288 my @jobstep_tomerge = ();
289 my $jobstep_tomerge_level = 0;
291 my $squeue_kill_checked;
292 my $output_in_keep = 0;
296 if (defined $Job->{thawedfromkey})
298 thaw ($Job->{thawedfromkey});
302 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
303 'job_uuid' => $Job->{'uuid'},
308 push @jobstep, { 'level' => 0,
310 'arvados_task' => $first_task,
312 push @jobstep_todo, 0;
319 $build_script = <DATA>;
323 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
325 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
328 $ENV{"CRUNCH_SRC"} = $Job->{revision};
332 Log (undef, "Install revision ".$Job->{revision});
333 my $nodelist = join(",", @node);
335 # Clean out crunch_tmp/work and crunch_tmp/opt
337 my $cleanpid = fork();
340 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
341 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
346 last if $cleanpid == waitpid (-1, WNOHANG);
347 freeze_if_want_freeze ($cleanpid);
348 select (undef, undef, undef, 0.1);
350 Log (undef, "Clean-work-dir exited $?");
352 # Install requested code version
355 my @srunargs = ("srun",
356 "--nodelist=$nodelist",
357 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
359 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
361 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
365 my $treeish = $Job->{'script_version'};
366 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
367 # Todo: let script_version specify repository instead of expecting
368 # parent process to figure it out.
369 $ENV{"CRUNCH_SRC_URL"} = $repo;
371 # Create/update our clone of the remote git repo
373 if (!-d $ENV{"CRUNCH_SRC"}) {
374 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
375 or croak ("git clone $repo failed: exit ".($?>>8));
376 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
378 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
380 # If this looks like a subversion r#, look for it in git-svn commit messages
382 if ($treeish =~ m{^\d{1,4}$}) {
383 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
385 if ($gitlog =~ /^[a-f0-9]{40}$/) {
387 Log (undef, "Using commit $commit for revision $treeish");
391 # If that didn't work, try asking git to look it up as a tree-ish.
393 if (!defined $commit) {
395 my $cooked_treeish = $treeish;
396 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
397 # Looks like a git branch name -- make sure git knows it's
398 # relative to the remote repo
399 $cooked_treeish = "origin/$treeish";
402 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
404 if ($found =~ /^[0-9a-f]{40}$/s) {
406 if ($commit ne $treeish) {
407 # Make sure we record the real commit id in the database,
408 # frozentokey, logs, etc. -- instead of an abbreviation or a
409 # branch name which can become ambiguous or point to a
410 # different commit in the future.
411 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
412 Log (undef, "Using commit $commit for tree-ish $treeish");
413 if ($commit ne $treeish) {
414 $Job->{'script_version'} = $commit;
415 !$job_has_uuid or $Job->save() or croak("Error while updating job");
421 if (defined $commit) {
422 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
423 @execargs = ("sh", "-c",
424 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
425 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
428 croak ("could not figure out commit id for $treeish");
431 my $installpid = fork();
432 if ($installpid == 0)
434 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
439 last if $installpid == waitpid (-1, WNOHANG);
440 freeze_if_want_freeze ($installpid);
441 select (undef, undef, undef, 0.1);
443 Log (undef, "Install exited $?");
448 foreach (qw (script script_version script_parameters resource_limits))
452 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
454 foreach (split (/\n/, $Job->{knobs}))
456 Log (undef, "knob " . $_);
467 my $thisround_succeeded = 0;
468 my $thisround_failed = 0;
469 my $thisround_failed_multiple = 0;
471 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
472 or $a <=> $b } @jobstep_todo;
473 my $level = $jobstep[$jobstep_todo[0]]->{level};
474 Log (undef, "start level $level");
479 my @freeslot = (0..$#slot);
482 my $progress_is_dirty = 1;
483 my $progress_stats_updated = 0;
485 update_progress_stats();
490 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
492 $main::please_continue = 0;
494 my $id = $jobstep_todo[$todo_ptr];
495 my $Jobstep = $jobstep[$id];
496 if ($Jobstep->{level} != $level)
500 if ($Jobstep->{attempts} > 9)
502 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
507 pipe $reader{$id}, "writer" or croak ($!);
508 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
511 my $childslot = $freeslot[0];
512 my $childnode = $slot[$childslot]->{node};
513 my $childslotname = join (".",
514 $slot[$childslot]->{node}->{name},
515 $slot[$childslot]->{cpu});
516 my $childpid = fork();
519 $SIG{'INT'} = 'DEFAULT';
520 $SIG{'QUIT'} = 'DEFAULT';
521 $SIG{'TERM'} = 'DEFAULT';
523 foreach (values (%reader))
527 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528 open(STDOUT,">&writer");
529 open(STDERR,">&writer");
534 delete $ENV{"GNUPGHOME"};
535 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536 $ENV{"TASK_QSEQUENCE"} = $id;
537 $ENV{"TASK_SEQUENCE"} = $level;
538 $ENV{"JOB_SCRIPT"} = $Job->{script};
539 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540 $param =~ tr/a-z/A-Z/;
541 $ENV{"JOB_PARAMETER_$param"} = $value;
543 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
546 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
552 "--nodelist=".$childnode->{name},
553 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
554 "--job-name=$job_id.$id.$$",
556 my @execargs = qw(sh);
557 my $build_script_to_send = "";
559 "mkdir -p $ENV{CRUNCH_TMP}/revision "
560 ."&& cd $ENV{CRUNCH_TMP} ";
563 $build_script_to_send = $build_script;
567 elsif (!$skip_install)
572 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
574 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
575 ." && ./installrevision "
579 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
581 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
582 my @execargs = ('bash', '-c', $command);
583 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
587 if (!defined $childpid)
594 $proc{$childpid} = { jobstep => $id,
597 jobstepname => "$job_id.$id.$childpid",
599 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
600 $slot[$childslot]->{pid} = $childpid;
602 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
603 Log ($id, "child $childpid started on $childslotname");
604 $Jobstep->{attempts} ++;
605 $Jobstep->{starttime} = time;
606 $Jobstep->{node} = $childnode->{name};
607 $Jobstep->{slotindex} = $childslot;
608 delete $Jobstep->{stderr};
609 delete $Jobstep->{finishtime};
611 splice @jobstep_todo, $todo_ptr, 1;
614 $progress_is_dirty = 1;
618 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
620 last THISROUND if $main::please_freeze;
621 if ($main::please_info)
623 $main::please_info = 0;
627 update_progress_stats();
635 update_progress_stats();
636 select (undef, undef, undef, 0.1);
638 elsif (time - $progress_stats_updated >= 30)
640 update_progress_stats();
642 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
643 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
645 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
646 .($thisround_failed+$thisround_succeeded)
647 .") -- giving up on this round";
648 Log (undef, $message);
652 # move slots from freeslot to holdslot (or back to freeslot) if necessary
653 for (my $i=$#freeslot; $i>=0; $i--) {
654 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
655 push @holdslot, (splice @freeslot, $i, 1);
658 for (my $i=$#holdslot; $i>=0; $i--) {
659 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
660 push @freeslot, (splice @holdslot, $i, 1);
664 # give up if no nodes are succeeding
665 if (!grep { $_->{node}->{losing_streak} == 0 &&
666 $_->{node}->{hold_count} < 4 } @slot) {
667 my $message = "Every node has failed -- giving up on this round";
668 Log (undef, $message);
675 push @freeslot, splice @holdslot;
676 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
679 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
682 goto THISROUND if $main::please_continue;
683 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
688 update_progress_stats();
689 select (undef, undef, undef, 0.1);
690 killem (keys %proc) if $main::please_freeze;
694 update_progress_stats();
695 freeze_if_want_freeze();
698 if (!defined $success)
701 $thisround_succeeded == 0 &&
702 ($thisround_failed == 0 || $thisround_failed > 4))
704 my $message = "stop because $thisround_failed tasks failed and none succeeded";
705 Log (undef, $message);
714 goto ONELEVEL if !defined $success;
717 release_allocation();
719 $Job->{'output'} = &collate_output();
720 $Job->{'success'} = $Job->{'output'} && $success;
721 $Job->save if $job_has_uuid;
723 if ($Job->{'output'})
726 my $manifest_text = capturex("whget", $Job->{'output'});
727 $arv->{'collections'}->{'create'}->execute('collection' => {
728 'uuid' => $Job->{'output'},
729 'manifest_text' => $manifest_text,
733 Log (undef, "Failed to register output manifest: $@");
737 Log (undef, "finish");
744 sub update_progress_stats
746 $progress_stats_updated = time;
747 return if !$progress_is_dirty;
748 my ($todo, $done, $running) = (scalar @jobstep_todo,
749 scalar @jobstep_done,
750 scalar @slot - scalar @freeslot - scalar @holdslot);
751 $Job->{'tasks_summary'} ||= {};
752 $Job->{'tasks_summary'}->{'todo'} = $todo;
753 $Job->{'tasks_summary'}->{'done'} = $done;
754 $Job->{'tasks_summary'}->{'running'} = $running;
755 $Job->save if $job_has_uuid;
756 Log (undef, "status: $done done, $running running, $todo todo");
757 $progress_is_dirty = 0;
764 my $pid = waitpid (-1, WNOHANG);
765 return 0 if $pid <= 0;
767 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
769 . $slot[$proc{$pid}->{slot}]->{cpu});
770 my $jobstepid = $proc{$pid}->{jobstep};
771 my $elapsed = time - $proc{$pid}->{time};
772 my $Jobstep = $jobstep[$jobstepid];
775 my $exitinfo = "exit $exitcode";
776 $Jobstep->{'arvados_task'}->reload;
777 my $success = $Jobstep->{'arvados_task'}->{success};
779 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
781 if (!defined $success) {
782 # task did not indicate one way or the other --> fail
783 $Jobstep->{'arvados_task'}->{success} = 0;
784 $Jobstep->{'arvados_task'}->save;
790 my $no_incr_attempts;
791 $no_incr_attempts = 1 if $Jobstep->{node_fail};
794 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
796 # Check for signs of a failed or misconfigured node
797 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
798 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
799 # Don't count this against jobstep failure thresholds if this
800 # node is already suspected faulty and srun exited quickly
801 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
803 $Jobstep->{attempts} > 1) {
804 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
805 $no_incr_attempts = 1;
806 --$Jobstep->{attempts};
808 ban_node_by_slot($proc{$pid}->{slot});
811 push @jobstep_todo, $jobstepid;
812 Log ($jobstepid, "failure in $elapsed seconds");
814 --$Jobstep->{attempts} if $no_incr_attempts;
815 $Job->{'tasks_summary'}->{'failed'}++;
819 ++$thisround_succeeded;
820 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
821 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
822 push @jobstep_done, $jobstepid;
823 Log ($jobstepid, "success in $elapsed seconds");
825 $Jobstep->{exitcode} = $exitcode;
826 $Jobstep->{finishtime} = time;
827 process_stderr ($jobstepid, $success);
828 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
830 close $reader{$jobstepid};
831 delete $reader{$jobstepid};
832 delete $slot[$proc{$pid}->{slot}]->{pid};
833 push @freeslot, $proc{$pid}->{slot};
837 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
839 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
841 'order' => 'qsequence'
843 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
845 'level' => $arvados_task->{'sequence'},
847 'arvados_task' => $arvados_task
849 push @jobstep, $jobstep;
850 push @jobstep_todo, $#jobstep;
853 $progress_is_dirty = 1;
860 # return if the kill list was checked <4 seconds ago
861 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
865 $squeue_kill_checked = time;
867 # use killem() on procs whose killtime is reached
870 if (exists $proc{$_}->{killtime}
871 && $proc{$_}->{killtime} <= time)
877 # return if the squeue was checked <60 seconds ago
878 if (defined $squeue_checked && $squeue_checked > time - 60)
882 $squeue_checked = time;
886 # here is an opportunity to check for mysterious problems with local procs
890 # get a list of steps still running
891 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
893 if ($squeue[-1] ne "ok")
899 # which of my jobsteps are running, according to squeue?
903 if (/^(\d+)\.(\d+) (\S+)/)
905 if ($1 eq $ENV{SLURM_JOBID})
912 # which of my active child procs (>60s old) were not mentioned by squeue?
915 if ($proc{$_}->{time} < time - 60
916 && !exists $ok{$proc{$_}->{jobstepname}}
917 && !exists $proc{$_}->{killtime})
919 # kill this proc if it hasn't exited in 30 seconds
920 $proc{$_}->{killtime} = time + 30;
926 sub release_allocation
930 Log (undef, "release job allocation");
931 system "scancel $ENV{SLURM_JOBID}";
939 foreach my $job (keys %reader)
942 while (0 < sysread ($reader{$job}, $buf, 8192))
944 print STDERR $buf if $ENV{CRUNCH_DEBUG};
945 $jobstep[$job]->{stderr} .= $buf;
946 preprocess_stderr ($job);
947 if (length ($jobstep[$job]->{stderr}) > 16384)
949 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
958 sub preprocess_stderr
962 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
964 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
965 Log ($job, "stderr $line");
966 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
968 $main::please_freeze = 1;
970 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
971 $jobstep[$job]->{node_fail} = 1;
972 ban_node_by_slot($jobstep[$job]->{slotindex});
982 preprocess_stderr ($job);
985 Log ($job, "stderr $_");
986 } split ("\n", $jobstep[$job]->{stderr});
992 my $whc = Warehouse->new;
993 Log (undef, "collate");
994 $whc->write_start (1);
998 next if (!exists $_->{'arvados_task'}->{output} ||
999 !$_->{'arvados_task'}->{'success'} ||
1000 $_->{'exitcode'} != 0);
1001 my $output = $_->{'arvados_task'}->{output};
1002 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1004 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1005 $whc->write_data ($output);
1007 elsif (@jobstep == 1)
1009 $joboutput = $output;
1012 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1014 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1015 $whc->write_data ($outblock);
1019 my $errstr = $whc->errstr;
1020 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1024 $joboutput = $whc->write_finish if !defined $joboutput;
1027 Log (undef, "output $joboutput");
1028 $Job->{'output'} = $joboutput;
1029 $Job->save if $job_has_uuid;
1033 Log (undef, "output undef");
1043 my $sig = 2; # SIGINT first
1044 if (exists $proc{$_}->{"sent_$sig"} &&
1045 time - $proc{$_}->{"sent_$sig"} > 4)
1047 $sig = 15; # SIGTERM if SIGINT doesn't work
1049 if (exists $proc{$_}->{"sent_$sig"} &&
1050 time - $proc{$_}->{"sent_$sig"} > 4)
1052 $sig = 9; # SIGKILL if SIGTERM doesn't work
1054 if (!exists $proc{$_}->{"sent_$sig"})
1056 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1058 select (undef, undef, undef, 0.1);
1061 kill $sig, $_; # srun wants two SIGINT to really interrupt
1063 $proc{$_}->{"sent_$sig"} = time;
1064 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1074 vec($bits,fileno($_),1) = 1;
1080 sub Log # ($jobstep_id, $logmessage)
1082 if ($_[1] =~ /\n/) {
1083 for my $line (split (/\n/, $_[1])) {
1088 my $fh = select STDERR; $|=1; select $fh;
1089 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1090 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1093 if ($metastream || -t STDERR) {
1094 my @gmtime = gmtime;
1095 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1096 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1098 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1100 return if !$metastream;
1101 $metastream->write_data ($datetime . " " . $message);
1107 my ($package, $file, $line) = caller;
1108 my $message = "@_ at $file line $line\n";
1109 Log (undef, $message);
1110 freeze() if @jobstep_todo;
1111 collate_output() if @jobstep_todo;
1113 save_meta() if $metastream;
1120 return if !$job_has_uuid;
1121 $Job->reload if $job_has_uuid;
1122 $Job->{'running'} = 0;
1123 $Job->{'success'} = 0;
1124 $Job->{'finished_at'} = gmtime;
1125 $Job->save if $job_has_uuid;
1131 my $justcheckpoint = shift; # false if this will be the last meta saved
1132 my $m = $metastream;
1133 $m = $m->copy if $justcheckpoint;
1135 my $loglocator = $m->as_key;
1136 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1137 Log (undef, "meta key is $loglocator");
1138 $Job->{'log'} = $loglocator;
1139 $Job->save if $job_has_uuid;
1143 sub freeze_if_want_freeze
1145 if ($main::please_freeze)
1147 release_allocation();
1150 # kill some srun procs before freeze+stop
1151 map { $proc{$_} = {} } @_;
1154 killem (keys %proc);
1155 select (undef, undef, undef, 0.1);
1157 while (($died = waitpid (-1, WNOHANG)) > 0)
1159 delete $proc{$died};
1174 Log (undef, "Freeze not implemented");
1181 croak ("Thaw not implemented");
1185 Log (undef, "thaw from $key");
1190 @jobstep_tomerge = ();
1191 $jobstep_tomerge_level = 0;
1194 my $stream = new Warehouse::Stream ( whc => $whc,
1195 hash => [split (",", $key)] );
1197 while (my $dataref = $stream->read_until (undef, "\n\n"))
1199 if ($$dataref =~ /^job /)
1201 foreach (split ("\n", $$dataref))
1203 my ($k, $v) = split ("=", $_, 2);
1204 $frozenjob->{$k} = freezeunquote ($v);
1209 if ($$dataref =~ /^merge (\d+) (.*)/)
1211 $jobstep_tomerge_level = $1;
1213 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1218 foreach (split ("\n", $$dataref))
1220 my ($k, $v) = split ("=", $_, 2);
1221 $Jobstep->{$k} = freezeunquote ($v) if $k;
1223 $Jobstep->{attempts} = 0;
1224 push @jobstep, $Jobstep;
1226 if ($Jobstep->{exitcode} eq "0")
1228 push @jobstep_done, $#jobstep;
1232 push @jobstep_todo, $#jobstep;
1236 foreach (qw (script script_version script_parameters))
1238 $Job->{$_} = $frozenjob->{$_};
1240 $Job->save if $job_has_uuid;
1256 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1263 my $srunargs = shift;
1264 my $execargs = shift;
1265 my $opts = shift || {};
1267 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1268 print STDERR (join (" ",
1269 map { / / ? "'$_'" : $_ }
1272 if $ENV{CRUNCH_DEBUG};
1274 if (defined $stdin) {
1275 my $child = open STDIN, "-|";
1276 defined $child or die "no fork: $!";
1278 print $stdin or die $!;
1279 close STDOUT or die $!;
1284 return system (@$args) if $opts->{fork};
1287 warn "ENV size is ".length(join(" ",%ENV));
1288 die "exec failed: $!: @$args";
1292 sub ban_node_by_slot {
1293 # Don't start any new jobsteps on this node for 60 seconds
1295 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1296 $slot[$slotid]->{node}->{hold_count}++;
1297 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1303 # checkout-and-build
1307 my $destdir = $ENV{"CRUNCH_SRC"};
1308 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1309 my $repo = $ENV{"CRUNCH_SRC_URL"};
1311 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1313 if (readlink ("$destdir.commit") eq $commit) {
1317 unlink "$destdir.commit";
1318 open STDOUT, ">", "$destdir.log";
1319 open STDERR, ">&STDOUT";
1322 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1325 die "'tar -C $destdir -xf -' exited $?: $!";
1329 chomp ($pwd = `pwd`);
1330 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1332 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1334 shell_or_die ("./tests/autotests.sh", $install_dir);
1335 } elsif (-e "./install.sh") {
1336 shell_or_die ("./install.sh", $install_dir);
1340 unlink "$destdir.commit.new";
1341 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1342 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1351 if ($ENV{"DEBUG"}) {
1352 print STDERR "@_\n";
1355 or die "@_ failed: $! exit 0x".sprintf("%x",$?);