2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
68 GetOptions('force-unlock' => \$force_unlock,
70 'resume-stash=s' => \$resume_stash,
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
83 $main::ENV{CRUNCH_DEBUG} = 1;
87 $main::ENV{CRUNCH_DEBUG} = 0;
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
95 $metastream->write_start('log.txt');
104 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105 $User = $arv->{'users'}->{'current'}->execute;
106 if (!$force_unlock) {
107 if ($Job->{'is_locked_by'}) {
108 croak("Job is locked: " . $Job->{'is_locked_by'});
110 if ($Job->{'success'} ne undef) {
111 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
113 if ($Job->{'running'}) {
114 croak("Job 'running' flag is already set");
116 if ($Job->{'started_at'}) {
117 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
123 $Job = JSON::decode_json($jobspec);
127 map { croak ("No $_ specified") unless $Job->{$_} }
128 qw(script script_version script_parameters);
131 if (!defined $Job->{'uuid'}) {
132 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
135 $job_id = $Job->{'uuid'};
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
144 Log (undef, "check slurm allocation");
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
151 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152 push @sinfo, "$localcpus localhost";
154 if (exists $ENV{SLURM_NODELIST})
156 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
160 my ($ncpus, $slurm_nodelist) = split;
161 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
164 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
167 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
170 foreach (split (",", $ranges))
183 push @nodelist, map {
185 $n =~ s/\[[-,\d]+\]/$_/;
192 push @nodelist, $nodelist;
195 foreach my $nodename (@nodelist)
197 Log (undef, "node $nodename - $ncpus slots");
198 my $node = { name => $nodename,
202 foreach my $cpu (1..$ncpus)
204 push @slot, { node => $node,
208 push @node, @nodelist;
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
223 # Claim this job, and make sure nobody else does
225 $Job->{'is_locked_by'} = $User->{'uuid'};
226 $Job->{'started_at'} = gmtime;
227 $Job->{'running'} = 1;
228 $Job->{'success'} = undef;
229 $Job->{'tasks_summary'} = { 'failed' => 0,
233 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234 croak("Error while updating / locking job");
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
267 if (defined $Job->{thawedfromkey})
269 thaw ($Job->{thawedfromkey});
273 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274 'job_uuid' => $Job->{'uuid'},
279 push @jobstep, { 'level' => 0,
281 'arvados_task' => $first_task,
283 push @jobstep_todo, 0;
290 $build_script = <DATA>;
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
299 $ENV{"CRUNCH_SRC"} = $Job->{revision};
303 Log (undef, "Install revision ".$Job->{revision});
304 my $nodelist = join(",", @node);
306 # Clean out crunch_tmp/work and crunch_tmp/opt
308 my $cleanpid = fork();
311 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
317 last if $cleanpid == waitpid (-1, WNOHANG);
318 freeze_if_want_freeze ($cleanpid);
319 select (undef, undef, undef, 0.1);
321 Log (undef, "Clean-work-dir exited $?");
323 # Install requested code version
326 my @srunargs = ("srun",
327 "--nodelist=$nodelist",
328 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
330 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
335 my $treeish = $Job->{'script_version'};
336 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337 # Todo: let script_version specify alternate repo
338 $ENV{"CRUNCH_SRC_URL"} = $repo;
340 # Create/update our clone of the remote git repo
342 if (!-d $ENV{"CRUNCH_SRC"}) {
343 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344 or croak ("git clone $repo failed: exit ".($?>>8));
345 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
347 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
349 # If this looks like a subversion r#, look for it in git-svn commit messages
351 if ($treeish =~ m{^\d{1,4}$}) {
352 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
354 if ($gitlog =~ /^[a-f0-9]{40}$/) {
356 Log (undef, "Using commit $commit for revision $treeish");
360 # If that didn't work, try asking git to look it up as a tree-ish.
362 if (!defined $commit) {
364 my $cooked_treeish = $treeish;
365 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366 # Looks like a git branch name -- make sure git knows it's
367 # relative to the remote repo
368 $cooked_treeish = "origin/$treeish";
371 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
373 if ($found =~ /^[0-9a-f]{40}$/s) {
375 if ($commit ne $treeish) {
376 # Make sure we record the real commit id in the database,
377 # frozentokey, logs, etc. -- instead of an abbreviation or a
378 # branch name which can become ambiguous or point to a
379 # different commit in the future.
380 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381 Log (undef, "Using commit $commit for tree-ish $treeish");
382 if ($commit ne $treeish) {
383 $Job->{'script_version'} = $commit;
384 $Job->save() or croak("Error while updating job");
390 if (defined $commit) {
391 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392 @execargs = ("sh", "-c",
393 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
396 croak ("could not figure out commit id for $treeish");
399 my $installpid = fork();
400 if ($installpid == 0)
402 srun (\@srunargs, \@execargs, {}, $build_script);
407 last if $installpid == waitpid (-1, WNOHANG);
408 freeze_if_want_freeze ($installpid);
409 select (undef, undef, undef, 0.1);
411 Log (undef, "Install exited $?");
416 foreach (qw (script script_version script_parameters resource_limits))
420 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
422 foreach (split (/\n/, $Job->{knobs}))
424 Log (undef, "knob " . $_);
435 my $thisround_succeeded = 0;
436 my $thisround_failed = 0;
437 my $thisround_failed_multiple = 0;
439 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
440 or $a <=> $b } @jobstep_todo;
441 my $level = $jobstep[$jobstep_todo[0]]->{level};
442 Log (undef, "start level $level");
447 my @freeslot = (0..$#slot);
450 my $progress_is_dirty = 1;
451 my $progress_stats_updated = 0;
453 update_progress_stats();
458 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
460 $main::please_continue = 0;
462 my $id = $jobstep_todo[$todo_ptr];
463 my $Jobstep = $jobstep[$id];
464 if ($Jobstep->{level} != $level)
468 if ($Jobstep->{attempts} > 9)
470 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
475 pipe $reader{$id}, "writer" or croak ($!);
476 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
477 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
479 my $childslot = $freeslot[0];
480 my $childnode = $slot[$childslot]->{node};
481 my $childslotname = join (".",
482 $slot[$childslot]->{node}->{name},
483 $slot[$childslot]->{cpu});
484 my $childpid = fork();
487 $SIG{'INT'} = 'DEFAULT';
488 $SIG{'QUIT'} = 'DEFAULT';
489 $SIG{'TERM'} = 'DEFAULT';
491 foreach (values (%reader))
495 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
496 open(STDOUT,">&writer");
497 open(STDERR,">&writer");
502 delete $ENV{"GNUPGHOME"};
503 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
504 $ENV{"TASK_QSEQUENCE"} = $id;
505 $ENV{"TASK_SEQUENCE"} = $level;
506 $ENV{"JOB_SCRIPT"} = $Job->{script};
507 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
508 $param =~ tr/a-z/A-Z/;
509 $ENV{"JOB_PARAMETER_$param"} = $value;
511 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
512 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
513 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
514 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
520 "--nodelist=".$childnode->{name},
521 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
522 "--job-name=$job_id.$id.$$",
524 my @execargs = qw(sh);
525 my $build_script_to_send = "";
527 "mkdir -p $ENV{CRUNCH_TMP}/revision "
528 ."&& cd $ENV{CRUNCH_TMP} ";
531 $build_script_to_send = $build_script;
535 elsif (!$skip_install)
540 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
542 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
543 ." && ./installrevision "
547 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
549 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
550 my @execargs = ('bash', '-c', $command);
551 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
555 if (!defined $childpid)
562 $proc{$childpid} = { jobstep => $id,
565 jobstepname => "$job_id.$id.$childpid",
567 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
568 $slot[$childslot]->{pid} = $childpid;
570 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
571 Log ($id, "child $childpid started on $childslotname");
572 $Jobstep->{attempts} ++;
573 $Jobstep->{starttime} = time;
574 $Jobstep->{node} = $childnode->{name};
575 $Jobstep->{slotindex} = $childslot;
576 delete $Jobstep->{stderr};
577 delete $Jobstep->{finishtime};
579 splice @jobstep_todo, $todo_ptr, 1;
582 $progress_is_dirty = 1;
586 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
588 last THISROUND if $main::please_freeze;
589 if ($main::please_info)
591 $main::please_info = 0;
595 update_progress_stats();
603 update_progress_stats();
604 select (undef, undef, undef, 0.1);
606 elsif (time - $progress_stats_updated >= 30)
608 update_progress_stats();
610 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
611 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
613 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
614 .($thisround_failed+$thisround_succeeded)
615 .") -- giving up on this round";
616 Log (undef, $message);
620 # move slots from freeslot to holdslot (or back to freeslot) if necessary
621 for (my $i=$#freeslot; $i>=0; $i--) {
622 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
623 push @holdslot, (splice @freeslot, $i, 1);
626 for (my $i=$#holdslot; $i>=0; $i--) {
627 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
628 push @freeslot, (splice @holdslot, $i, 1);
632 # give up if no nodes are succeeding
633 if (!grep { $_->{node}->{losing_streak} == 0 &&
634 $_->{node}->{hold_count} < 4 } @slot) {
635 my $message = "Every node has failed -- giving up on this round";
636 Log (undef, $message);
643 push @freeslot, splice @holdslot;
644 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
647 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
650 goto THISROUND if $main::please_continue;
651 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
656 update_progress_stats();
657 select (undef, undef, undef, 0.1);
658 killem (keys %proc) if $main::please_freeze;
662 update_progress_stats();
663 freeze_if_want_freeze();
666 if (!defined $success)
669 $thisround_succeeded == 0 &&
670 ($thisround_failed == 0 || $thisround_failed > 4))
672 my $message = "stop because $thisround_failed tasks failed and none succeeded";
673 Log (undef, $message);
682 goto ONELEVEL if !defined $success;
685 release_allocation();
687 $Job->{'output'} = &collate_output();
688 $Job->{'success'} = $Job->{'output'} && $success;
691 if ($Job->{'output'})
694 my $manifest_text = capturex("whget", $Job->{'output'});
695 $arv->{'collections'}->{'create'}->execute('collection' => {
696 'uuid' => $Job->{'output'},
697 'manifest_text' => $manifest_text,
701 Log (undef, "Failed to register output manifest: $@");
705 Log (undef, "finish");
712 sub update_progress_stats
714 $progress_stats_updated = time;
715 return if !$progress_is_dirty;
716 my ($todo, $done, $running) = (scalar @jobstep_todo,
717 scalar @jobstep_done,
718 scalar @slot - scalar @freeslot - scalar @holdslot);
719 $Job->{'tasks_summary'} ||= {};
720 $Job->{'tasks_summary'}->{'todo'} = $todo;
721 $Job->{'tasks_summary'}->{'done'} = $done;
722 $Job->{'tasks_summary'}->{'running'} = $running;
724 Log (undef, "status: $done done, $running running, $todo todo");
725 $progress_is_dirty = 0;
732 my $pid = waitpid (-1, WNOHANG);
733 return 0 if $pid <= 0;
735 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
737 . $slot[$proc{$pid}->{slot}]->{cpu});
738 my $jobstepid = $proc{$pid}->{jobstep};
739 my $elapsed = time - $proc{$pid}->{time};
740 my $Jobstep = $jobstep[$jobstepid];
743 my $exitinfo = "exit $exitcode";
744 $Jobstep->{'arvados_task'}->reload;
745 my $success = $Jobstep->{'arvados_task'}->{success};
747 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
749 if (!defined $success) {
750 # task did not indicate one way or the other --> fail
751 $Jobstep->{'arvados_task'}->{success} = 0;
752 $Jobstep->{'arvados_task'}->save;
758 my $no_incr_attempts;
759 $no_incr_attempts = 1 if $Jobstep->{node_fail};
762 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
764 # Check for signs of a failed or misconfigured node
765 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
766 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
767 # Don't count this against jobstep failure thresholds if this
768 # node is already suspected faulty and srun exited quickly
769 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
771 $Jobstep->{attempts} > 1) {
772 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
773 $no_incr_attempts = 1;
774 --$Jobstep->{attempts};
776 ban_node_by_slot($proc{$pid}->{slot});
779 push @jobstep_todo, $jobstepid;
780 Log ($jobstepid, "failure in $elapsed seconds");
782 --$Jobstep->{attempts} if $no_incr_attempts;
783 $Job->{'tasks_summary'}->{'failed'}++;
787 ++$thisround_succeeded;
788 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
789 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
790 push @jobstep_done, $jobstepid;
791 Log ($jobstepid, "success in $elapsed seconds");
793 $Jobstep->{exitcode} = $exitcode;
794 $Jobstep->{finishtime} = time;
795 process_stderr ($jobstepid, $success);
796 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
798 close $reader{$jobstepid};
799 delete $reader{$jobstepid};
800 delete $slot[$proc{$pid}->{slot}]->{pid};
801 push @freeslot, $proc{$pid}->{slot};
805 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
807 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
809 'order' => 'qsequence'
811 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
813 'level' => $arvados_task->{'sequence'},
815 'arvados_task' => $arvados_task
817 push @jobstep, $jobstep;
818 push @jobstep_todo, $#jobstep;
821 $progress_is_dirty = 1;
828 # return if the kill list was checked <4 seconds ago
829 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
833 $squeue_kill_checked = time;
835 # use killem() on procs whose killtime is reached
838 if (exists $proc{$_}->{killtime}
839 && $proc{$_}->{killtime} <= time)
845 # return if the squeue was checked <60 seconds ago
846 if (defined $squeue_checked && $squeue_checked > time - 60)
850 $squeue_checked = time;
854 # here is an opportunity to check for mysterious problems with local procs
858 # get a list of steps still running
859 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
861 if ($squeue[-1] ne "ok")
867 # which of my jobsteps are running, according to squeue?
871 if (/^(\d+)\.(\d+) (\S+)/)
873 if ($1 eq $ENV{SLURM_JOBID})
880 # which of my active child procs (>60s old) were not mentioned by squeue?
883 if ($proc{$_}->{time} < time - 60
884 && !exists $ok{$proc{$_}->{jobstepname}}
885 && !exists $proc{$_}->{killtime})
887 # kill this proc if it hasn't exited in 30 seconds
888 $proc{$_}->{killtime} = time + 30;
894 sub release_allocation
898 Log (undef, "release job allocation");
899 system "scancel $ENV{SLURM_JOBID}";
907 foreach my $job (keys %reader)
910 while (0 < sysread ($reader{$job}, $buf, 8192))
912 print STDERR $buf if $ENV{CRUNCH_DEBUG};
913 $jobstep[$job]->{stderr} .= $buf;
914 preprocess_stderr ($job);
915 if (length ($jobstep[$job]->{stderr}) > 16384)
917 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
926 sub preprocess_stderr
930 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
932 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
933 Log ($job, "stderr $line");
934 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
936 $main::please_freeze = 1;
938 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
939 $jobstep[$job]->{node_fail} = 1;
940 ban_node_by_slot($jobstep[$job]->{slotindex});
950 preprocess_stderr ($job);
953 Log ($job, "stderr $_");
954 } split ("\n", $jobstep[$job]->{stderr});
960 my $whc = Warehouse->new;
961 Log (undef, "collate");
962 $whc->write_start (1);
966 next if (!exists $_->{'arvados_task'}->{output} ||
967 !$_->{'arvados_task'}->{'success'} ||
968 $_->{'exitcode'} != 0);
969 my $output = $_->{'arvados_task'}->{output};
970 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
972 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
973 $whc->write_data ($output);
975 elsif (@jobstep == 1)
977 $joboutput = $output;
980 elsif (defined (my $outblock = $whc->fetch_block ($output)))
982 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
983 $whc->write_data ($outblock);
987 my $errstr = $whc->errstr;
988 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
992 $joboutput = $whc->write_finish if !defined $joboutput;
995 Log (undef, "output $joboutput");
996 $Job->{'output'} = $joboutput;
1001 Log (undef, "output undef");
1011 my $sig = 2; # SIGINT first
1012 if (exists $proc{$_}->{"sent_$sig"} &&
1013 time - $proc{$_}->{"sent_$sig"} > 4)
1015 $sig = 15; # SIGTERM if SIGINT doesn't work
1017 if (exists $proc{$_}->{"sent_$sig"} &&
1018 time - $proc{$_}->{"sent_$sig"} > 4)
1020 $sig = 9; # SIGKILL if SIGTERM doesn't work
1022 if (!exists $proc{$_}->{"sent_$sig"})
1024 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1026 select (undef, undef, undef, 0.1);
1029 kill $sig, $_; # srun wants two SIGINT to really interrupt
1031 $proc{$_}->{"sent_$sig"} = time;
1032 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1042 vec($bits,fileno($_),1) = 1;
1048 sub Log # ($jobstep_id, $logmessage)
1050 if ($_[1] =~ /\n/) {
1051 for my $line (split (/\n/, $_[1])) {
1056 my $fh = select STDERR; $|=1; select $fh;
1057 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1058 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1061 if ($metastream || -t STDERR) {
1062 my @gmtime = gmtime;
1063 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1064 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1066 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1068 return if !$metastream;
1069 $metastream->write_data ($datetime . " " . $message);
1073 sub reconnect_database
1075 return if !$job_has_uuid;
1076 return if ($dbh && $dbh->do ("select now()"));
1079 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1081 $dbh->{InactiveDestroy} = 1;
1084 warn ($DBI::errstr);
1087 croak ($DBI::errstr) if !$dbh;
1093 return 1 if !$job_has_uuid;
1094 my $ret = $dbh->do (@_);
1095 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1096 reconnect_database();
1097 return $dbh->do (@_);
1103 my ($package, $file, $line) = caller;
1104 my $message = "@_ at $file line $line\n";
1105 Log (undef, $message);
1106 freeze() if @jobstep_todo;
1107 collate_output() if @jobstep_todo;
1109 save_meta() if $metastream;
1116 return if !$job_has_uuid;
1118 $Job->{'running'} = 0;
1119 $Job->{'success'} = 0;
1120 $Job->{'finished_at'} = gmtime;
1127 my $justcheckpoint = shift; # false if this will be the last meta saved
1128 my $m = $metastream;
1129 $m = $m->copy if $justcheckpoint;
1131 my $loglocator = $m->as_key;
1132 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1133 Log (undef, "meta key is $loglocator");
1134 $Job->{'log'} = $loglocator;
1139 sub freeze_if_want_freeze
1141 if ($main::please_freeze)
1143 release_allocation();
1146 # kill some srun procs before freeze+stop
1147 map { $proc{$_} = {} } @_;
1150 killem (keys %proc);
1151 select (undef, undef, undef, 0.1);
1153 while (($died = waitpid (-1, WNOHANG)) > 0)
1155 delete $proc{$died};
1170 Log (undef, "Freeze not implemented");
1174 Log (undef, "freeze");
1176 my $freezer = new Warehouse::Stream (whc => $whc);
1178 $freezer->name (".");
1179 $freezer->write_start ("state.txt");
1181 $freezer->write_data (join ("\n",
1185 $_ . "=" . freezequote($Job->{$_})
1186 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1188 foreach my $Jobstep (@jobstep)
1190 my $str = join ("\n",
1193 $_ . "=" . freezequote ($Jobstep->{$_})
1195 $_ !~ /^stderr|slotindex|node_fail/
1197 $freezer->write_data ($str."\n\n");
1199 if (@jobstep_tomerge)
1201 $freezer->write_data
1202 ("merge $jobstep_tomerge_level "
1203 . freezequote (join ("\n",
1204 map { freezequote ($_) } @jobstep_tomerge))
1208 $freezer->write_finish;
1209 my $frozentokey = $freezer->as_key;
1211 Log (undef, "frozento key is $frozentokey");
1212 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1213 $frozentokey, $job_id);
1214 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1215 Log (undef, "frozento+K key is $kfrozentokey");
1216 return $frozentokey;
1222 croak ("Thaw not implemented");
1226 Log (undef, "thaw from $key");
1231 @jobstep_tomerge = ();
1232 $jobstep_tomerge_level = 0;
1235 my $stream = new Warehouse::Stream ( whc => $whc,
1236 hash => [split (",", $key)] );
1238 while (my $dataref = $stream->read_until (undef, "\n\n"))
1240 if ($$dataref =~ /^job /)
1242 foreach (split ("\n", $$dataref))
1244 my ($k, $v) = split ("=", $_, 2);
1245 $frozenjob->{$k} = freezeunquote ($v);
1250 if ($$dataref =~ /^merge (\d+) (.*)/)
1252 $jobstep_tomerge_level = $1;
1254 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1259 foreach (split ("\n", $$dataref))
1261 my ($k, $v) = split ("=", $_, 2);
1262 $Jobstep->{$k} = freezeunquote ($v) if $k;
1264 $Jobstep->{attempts} = 0;
1265 push @jobstep, $Jobstep;
1267 if ($Jobstep->{exitcode} eq "0")
1269 push @jobstep_done, $#jobstep;
1273 push @jobstep_todo, $#jobstep;
1277 foreach (qw (script script_version script_parameters))
1279 $Job->{$_} = $frozenjob->{$_};
1297 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1304 my $srunargs = shift;
1305 my $execargs = shift;
1306 my $opts = shift || {};
1308 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1309 print STDERR (join (" ",
1310 map { / / ? "'$_'" : $_ }
1313 if $ENV{CRUNCH_DEBUG};
1315 if (defined $stdin) {
1316 my $child = open STDIN, "-|";
1317 defined $child or die "no fork: $!";
1319 print $stdin or die $!;
1320 close STDOUT or die $!;
1325 return system (@$args) if $opts->{fork};
1328 warn "ENV size is ".length(join(" ",%ENV));
1329 die "exec failed: $!: @$args";
1333 sub ban_node_by_slot {
1334 # Don't start any new jobsteps on this node for 60 seconds
1336 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1337 $slot[$slotid]->{node}->{hold_count}++;
1338 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1344 # checkout-and-build
1348 my $destdir = $ENV{"CRUNCH_SRC"};
1349 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1350 my $repo = $ENV{"CRUNCH_SRC_URL"};
1352 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1354 if (readlink ("$destdir.commit") eq $commit) {
1358 open STDOUT, ">", "$destdir.log";
1359 open STDERR, ">&STDOUT";
1361 if (-d "$destdir/.git") {
1362 chdir $destdir or die "chdir $destdir: $!";
1363 if (0 != system (qw(git remote set-url origin), $repo)) {
1364 # awful... for old versions of git that don't know "remote set-url"
1365 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1368 elsif ($repo && $commit)
1370 shell_or_die('git', 'clone', $repo, $destdir);
1371 chdir $destdir or die "chdir $destdir: $!";
1372 shell_or_die(qw(git config clean.requireForce false));
1375 die "$destdir does not exist, and no repo/commit specified -- giving up";
1379 unlink "$destdir.commit";
1380 shell_or_die (qw(git stash));
1381 shell_or_die (qw(git clean -d -x));
1382 shell_or_die (qw(git fetch origin));
1383 shell_or_die (qw(git checkout), $commit);
1387 chomp ($pwd = `pwd`);
1388 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1390 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1392 shell_or_die ("./tests/autotests.sh", $install_dir);
1393 } elsif (-e "./install.sh") {
1394 shell_or_die ("./install.sh", $install_dir);
1398 unlink "$destdir.commit.new";
1399 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1400 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1409 if ($ENV{"DEBUG"}) {
1410 print STDERR "@_\n";
1413 or die "@_ failed: $! exit 0x".sprintf("%x",$?);