2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
68 GetOptions('force-unlock' => \$force_unlock,
70 'resume-stash=s' => \$resume_stash,
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
83 $main::ENV{CRUNCH_DEBUG} = 1;
87 $main::ENV{CRUNCH_DEBUG} = 0;
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
95 $metastream->write_start('log.txt');
104 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105 $User = $arv->{'users'}->{'current'}->execute;
106 if (!$force_unlock) {
107 if ($Job->{'is_locked_by'}) {
108 croak("Job is locked: " . $Job->{'is_locked_by'});
110 if ($Job->{'success'} ne undef) {
111 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
113 if ($Job->{'running'}) {
114 croak("Job 'running' flag is already set");
116 if ($Job->{'started_at'}) {
117 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
123 $Job = JSON::decode_json($jobspec);
127 map { croak ("No $_ specified") unless $Job->{$_} }
128 qw(script script_version script_parameters);
131 if (!defined $Job->{'uuid'}) {
132 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
135 $job_id = $Job->{'uuid'};
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
144 Log (undef, "check slurm allocation");
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
151 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152 push @sinfo, "$localcpus localhost";
154 if (exists $ENV{SLURM_NODELIST})
156 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
160 my ($ncpus, $slurm_nodelist) = split;
161 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
164 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
167 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
170 foreach (split (",", $ranges))
183 push @nodelist, map {
185 $n =~ s/\[[-,\d]+\]/$_/;
192 push @nodelist, $nodelist;
195 foreach my $nodename (@nodelist)
197 Log (undef, "node $nodename - $ncpus slots");
198 my $node = { name => $nodename,
202 foreach my $cpu (1..$ncpus)
204 push @slot, { node => $node,
208 push @node, @nodelist;
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
223 # Claim this job, and make sure nobody else does
225 $Job->{'is_locked_by'} = $User->{'uuid'};
226 $Job->{'started_at'} = time;
227 $Job->{'running'} = 1;
228 $Job->{'success'} = undef;
229 $Job->{'tasks_summary'} = { 'failed' => 0,
233 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234 croak("Error while updating / locking job");
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
267 if (defined $Job->{thawedfromkey})
269 thaw ($Job->{thawedfromkey});
273 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274 'job_uuid' => $Job->{'uuid'},
279 push @jobstep, { 'level' => 0,
281 'arvados_task' => $first_task,
283 push @jobstep_todo, 0;
290 $build_script = <DATA>;
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
299 $ENV{"CRUNCH_SRC"} = $Job->{revision};
303 Log (undef, "Install revision ".$Job->{revision});
304 my $nodelist = join(",", @node);
306 # Clean out crunch_tmp/work and crunch_tmp/opt
308 my $cleanpid = fork();
311 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
317 last if $cleanpid == waitpid (-1, WNOHANG);
318 freeze_if_want_freeze ($cleanpid);
319 select (undef, undef, undef, 0.1);
321 Log (undef, "Clean-work-dir exited $?");
323 # Install requested code version
326 my @srunargs = ("srun",
327 "--nodelist=$nodelist",
328 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
330 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
335 my $treeish = $Job->{'script_version'};
336 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337 # Todo: let script_version specify alternate repo
338 $ENV{"CRUNCH_SRC_URL"} = $repo;
340 # Create/update our clone of the remote git repo
342 if (!-d $ENV{"CRUNCH_SRC"}) {
343 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344 or croak ("git clone $repo failed: exit ".($?>>8));
345 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
347 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
349 # If this looks like a subversion r#, look for it in git-svn commit messages
351 if ($treeish =~ m{^\d{1,4}$}) {
352 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
354 if ($gitlog =~ /^[a-f0-9]{40}$/) {
356 Log (undef, "Using commit $commit for revision $treeish");
360 # If that didn't work, try asking git to look it up as a tree-ish.
362 if (!defined $commit) {
364 my $cooked_treeish = $treeish;
365 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366 # Looks like a git branch name -- make sure git knows it's
367 # relative to the remote repo
368 $cooked_treeish = "origin/$treeish";
371 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
373 if ($found =~ /^[0-9a-f]{40}$/s) {
375 if ($commit ne $treeish) {
376 # Make sure we record the real commit id in the database,
377 # frozentokey, logs, etc. -- instead of an abbreviation or a
378 # branch name which can become ambiguous or point to a
379 # different commit in the future.
380 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381 Log (undef, "Using commit $commit for tree-ish $treeish");
382 if ($commit ne $treeish) {
383 $Job->{'script_version'} = $commit;
384 $Job->save() or croak("Error while updating job");
390 if (defined $commit) {
391 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392 @execargs = ("sh", "-c",
393 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
396 croak ("could not figure out commit id for $treeish");
399 my $installpid = fork();
400 if ($installpid == 0)
402 srun (\@srunargs, \@execargs, {}, $build_script);
407 last if $installpid == waitpid (-1, WNOHANG);
408 freeze_if_want_freeze ($installpid);
409 select (undef, undef, undef, 0.1);
411 Log (undef, "Install exited $?");
416 foreach (qw (script script_version script_parameters resource_limits))
418 Log (undef, $_ . " " . $Job->{$_});
420 foreach (split (/\n/, $Job->{knobs}))
422 Log (undef, "knob " . $_);
433 my $thisround_succeeded = 0;
434 my $thisround_failed = 0;
435 my $thisround_failed_multiple = 0;
437 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
438 or $a <=> $b } @jobstep_todo;
439 my $level = $jobstep[$jobstep_todo[0]]->{level};
440 Log (undef, "start level $level");
445 my @freeslot = (0..$#slot);
448 my $progress_is_dirty = 1;
449 my $progress_stats_updated = 0;
451 update_progress_stats();
456 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
458 $main::please_continue = 0;
460 my $id = $jobstep_todo[$todo_ptr];
461 my $Jobstep = $jobstep[$id];
462 if ($Jobstep->{level} != $level)
466 if ($Jobstep->{attempts} > 9)
468 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
473 pipe $reader{$id}, "writer" or croak ($!);
474 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
475 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
477 my $childslot = $freeslot[0];
478 my $childnode = $slot[$childslot]->{node};
479 my $childslotname = join (".",
480 $slot[$childslot]->{node}->{name},
481 $slot[$childslot]->{cpu});
482 my $childpid = fork();
485 $SIG{'INT'} = 'DEFAULT';
486 $SIG{'QUIT'} = 'DEFAULT';
487 $SIG{'TERM'} = 'DEFAULT';
489 foreach (values (%reader))
493 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
494 open(STDOUT,">&writer");
495 open(STDERR,">&writer");
500 delete $ENV{"GNUPGHOME"};
501 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
502 $ENV{"TASK_QSEQUENCE"} = $id;
503 $ENV{"TASK_SEQUENCE"} = $level;
504 $ENV{"JOB_SCRIPT"} = $Job->{script};
505 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
506 $param =~ tr/a-z/A-Z/;
507 $ENV{"JOB_PARAMETER_$param"} = $value;
509 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
510 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
511 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
512 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
518 "--nodelist=".$childnode->{name},
519 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
520 "--job-name=$job_id.$id.$$",
522 my @execargs = qw(sh);
523 my $build_script_to_send = "";
525 "mkdir -p $ENV{CRUNCH_TMP}/revision "
526 ."&& cd $ENV{CRUNCH_TMP} ";
529 $build_script_to_send = $build_script;
533 elsif (!$skip_install)
538 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
540 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
541 ." && ./installrevision "
545 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
547 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
548 my @execargs = ('bash', '-c', $command);
549 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
553 if (!defined $childpid)
560 $proc{$childpid} = { jobstep => $id,
563 jobstepname => "$job_id.$id.$childpid",
565 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
566 $slot[$childslot]->{pid} = $childpid;
568 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
569 Log ($id, "child $childpid started on $childslotname");
570 $Jobstep->{attempts} ++;
571 $Jobstep->{starttime} = time;
572 $Jobstep->{node} = $childnode->{name};
573 $Jobstep->{slotindex} = $childslot;
574 delete $Jobstep->{stderr};
575 delete $Jobstep->{finishtime};
577 splice @jobstep_todo, $todo_ptr, 1;
580 $progress_is_dirty = 1;
584 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
586 last THISROUND if $main::please_freeze;
587 if ($main::please_info)
589 $main::please_info = 0;
593 update_progress_stats();
601 update_progress_stats();
602 select (undef, undef, undef, 0.1);
604 elsif (time - $progress_stats_updated >= 30)
606 update_progress_stats();
608 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
609 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
611 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
612 .($thisround_failed+$thisround_succeeded)
613 .") -- giving up on this round";
614 Log (undef, $message);
618 # move slots from freeslot to holdslot (or back to freeslot) if necessary
619 for (my $i=$#freeslot; $i>=0; $i--) {
620 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
621 push @holdslot, (splice @freeslot, $i, 1);
624 for (my $i=$#holdslot; $i>=0; $i--) {
625 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
626 push @freeslot, (splice @holdslot, $i, 1);
630 # give up if no nodes are succeeding
631 if (!grep { $_->{node}->{losing_streak} == 0 &&
632 $_->{node}->{hold_count} < 4 } @slot) {
633 my $message = "Every node has failed -- giving up on this round";
634 Log (undef, $message);
641 push @freeslot, splice @holdslot;
642 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
645 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
648 goto THISROUND if $main::please_continue;
649 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
654 update_progress_stats();
655 select (undef, undef, undef, 0.1);
656 killem (keys %proc) if $main::please_freeze;
660 update_progress_stats();
661 freeze_if_want_freeze();
664 if (!defined $success)
667 $thisround_succeeded == 0 &&
668 ($thisround_failed == 0 || $thisround_failed > 4))
670 my $message = "stop because $thisround_failed tasks failed and none succeeded";
671 Log (undef, $message);
680 goto ONELEVEL if !defined $success;
683 release_allocation();
685 $Job->{'output'} = &collate_output();
686 $Job->{'success'} = $Job->{'output'} && $success;
689 if ($Job->{'output'})
692 my $manifest_text = capturex("whget", $Job->{'output'});
693 $arv->{'collections'}->{'create'}->execute('collection' => {
694 'uuid' => $Job->{'output'},
695 'manifest_text' => $manifest_text,
699 Log (undef, "Failed to register output manifest: $@");
703 Log (undef, "finish");
710 sub update_progress_stats
712 $progress_stats_updated = time;
713 return if !$progress_is_dirty;
714 my ($todo, $done, $running) = (scalar @jobstep_todo,
715 scalar @jobstep_done,
716 scalar @slot - scalar @freeslot - scalar @holdslot);
717 $Job->{'tasks_summary'} ||= {};
718 $Job->{'tasks_summary'}->{'todo'} = $todo;
719 $Job->{'tasks_summary'}->{'done'} = $done;
720 $Job->{'tasks_summary'}->{'running'} = $running;
722 Log (undef, "status: $done done, $running running, $todo todo");
723 $progress_is_dirty = 0;
730 my $pid = waitpid (-1, WNOHANG);
731 return 0 if $pid <= 0;
733 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
735 . $slot[$proc{$pid}->{slot}]->{cpu});
736 my $jobstepid = $proc{$pid}->{jobstep};
737 my $elapsed = time - $proc{$pid}->{time};
738 my $Jobstep = $jobstep[$jobstepid];
741 my $exitinfo = "exit $exitcode";
742 $Jobstep->{'arvados_task'}->reload;
743 my $success = $Jobstep->{'arvados_task'}->{success};
745 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
747 if (!defined $success) {
748 # task did not indicate one way or the other --> fail
749 $Jobstep->{'arvados_task'}->{success} = 0;
750 $Jobstep->{'arvados_task'}->save;
756 my $no_incr_attempts;
757 $no_incr_attempts = 1 if $Jobstep->{node_fail};
760 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
762 # Check for signs of a failed or misconfigured node
763 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
764 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
765 # Don't count this against jobstep failure thresholds if this
766 # node is already suspected faulty and srun exited quickly
767 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
769 $Jobstep->{attempts} > 1) {
770 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
771 $no_incr_attempts = 1;
772 --$Jobstep->{attempts};
774 ban_node_by_slot($proc{$pid}->{slot});
777 push @jobstep_todo, $jobstepid;
778 Log ($jobstepid, "failure in $elapsed seconds");
780 --$Jobstep->{attempts} if $no_incr_attempts;
781 $Job->{'tasks_summary'}->{'failed'}++;
785 ++$thisround_succeeded;
786 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
787 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
788 push @jobstep_done, $jobstepid;
789 Log ($jobstepid, "success in $elapsed seconds");
791 $Jobstep->{exitcode} = $exitcode;
792 $Jobstep->{finishtime} = time;
793 process_stderr ($jobstepid, $success);
794 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
796 close $reader{$jobstepid};
797 delete $reader{$jobstepid};
798 delete $slot[$proc{$pid}->{slot}]->{pid};
799 push @freeslot, $proc{$pid}->{slot};
803 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
805 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
807 'order' => 'qsequence'
809 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
811 'level' => $arvados_task->{'sequence'},
813 'arvados_task' => $arvados_task
815 push @jobstep, $jobstep;
816 push @jobstep_todo, $#jobstep;
819 $progress_is_dirty = 1;
826 # return if the kill list was checked <4 seconds ago
827 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
831 $squeue_kill_checked = time;
833 # use killem() on procs whose killtime is reached
836 if (exists $proc{$_}->{killtime}
837 && $proc{$_}->{killtime} <= time)
843 # return if the squeue was checked <60 seconds ago
844 if (defined $squeue_checked && $squeue_checked > time - 60)
848 $squeue_checked = time;
852 # here is an opportunity to check for mysterious problems with local procs
856 # get a list of steps still running
857 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
859 if ($squeue[-1] ne "ok")
865 # which of my jobsteps are running, according to squeue?
869 if (/^(\d+)\.(\d+) (\S+)/)
871 if ($1 eq $ENV{SLURM_JOBID})
878 # which of my active child procs (>60s old) were not mentioned by squeue?
881 if ($proc{$_}->{time} < time - 60
882 && !exists $ok{$proc{$_}->{jobstepname}}
883 && !exists $proc{$_}->{killtime})
885 # kill this proc if it hasn't exited in 30 seconds
886 $proc{$_}->{killtime} = time + 30;
892 sub release_allocation
896 Log (undef, "release job allocation");
897 system "scancel $ENV{SLURM_JOBID}";
905 foreach my $job (keys %reader)
908 while (0 < sysread ($reader{$job}, $buf, 8192))
910 print STDERR $buf if $ENV{CRUNCH_DEBUG};
911 $jobstep[$job]->{stderr} .= $buf;
912 preprocess_stderr ($job);
913 if (length ($jobstep[$job]->{stderr}) > 16384)
915 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
924 sub preprocess_stderr
928 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
930 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
931 Log ($job, "stderr $line");
932 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
934 $main::please_freeze = 1;
936 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
937 $jobstep[$job]->{node_fail} = 1;
938 ban_node_by_slot($jobstep[$job]->{slotindex});
948 preprocess_stderr ($job);
951 Log ($job, "stderr $_");
952 } split ("\n", $jobstep[$job]->{stderr});
958 my $whc = Warehouse->new;
959 Log (undef, "collate");
960 $whc->write_start (1);
964 next if (!exists $_->{'arvados_task'}->{output} ||
965 !$_->{'arvados_task'}->{'success'} ||
966 $_->{'exitcode'} != 0);
967 my $output = $_->{'arvados_task'}->{output};
968 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
970 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
971 $whc->write_data ($output);
973 elsif (@jobstep == 1)
975 $joboutput = $output;
978 elsif (defined (my $outblock = $whc->fetch_block ($output)))
980 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
981 $whc->write_data ($outblock);
985 my $errstr = $whc->errstr;
986 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
990 $joboutput = $whc->write_finish if !defined $joboutput;
993 Log (undef, "output $joboutput");
994 $Job->{'output'} = $joboutput;
999 Log (undef, "output undef");
1009 my $sig = 2; # SIGINT first
1010 if (exists $proc{$_}->{"sent_$sig"} &&
1011 time - $proc{$_}->{"sent_$sig"} > 4)
1013 $sig = 15; # SIGTERM if SIGINT doesn't work
1015 if (exists $proc{$_}->{"sent_$sig"} &&
1016 time - $proc{$_}->{"sent_$sig"} > 4)
1018 $sig = 9; # SIGKILL if SIGTERM doesn't work
1020 if (!exists $proc{$_}->{"sent_$sig"})
1022 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1024 select (undef, undef, undef, 0.1);
1027 kill $sig, $_; # srun wants two SIGINT to really interrupt
1029 $proc{$_}->{"sent_$sig"} = time;
1030 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1040 vec($bits,fileno($_),1) = 1;
1046 sub Log # ($jobstep_id, $logmessage)
1048 if ($_[1] =~ /\n/) {
1049 for my $line (split (/\n/, $_[1])) {
1054 my $fh = select STDERR; $|=1; select $fh;
1055 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1056 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1059 if ($metastream || -t STDERR) {
1060 my @gmtime = gmtime;
1061 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1062 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1064 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1066 return if !$metastream;
1067 $metastream->write_data ($datetime . " " . $message);
1071 sub reconnect_database
1073 return if !$job_has_uuid;
1074 return if ($dbh && $dbh->do ("select now()"));
1077 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1079 $dbh->{InactiveDestroy} = 1;
1082 warn ($DBI::errstr);
1085 croak ($DBI::errstr) if !$dbh;
1091 return 1 if !$job_has_uuid;
1092 my $ret = $dbh->do (@_);
1093 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1094 reconnect_database();
1095 return $dbh->do (@_);
1101 my ($package, $file, $line) = caller;
1102 my $message = "@_ at $file line $line\n";
1103 Log (undef, $message);
1104 freeze() if @jobstep_todo;
1105 collate_output() if @jobstep_todo;
1107 save_meta() if $metastream;
1114 return if !$job_has_uuid;
1116 $Job->{'running'} = 0;
1117 $Job->{'success'} = 0;
1118 $Job->{'finished_at'} = time;
1125 my $justcheckpoint = shift; # false if this will be the last meta saved
1126 my $m = $metastream;
1127 $m = $m->copy if $justcheckpoint;
1129 my $loglocator = $m->as_key;
1130 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1131 Log (undef, "meta key is $loglocator");
1132 $Job->{'log'} = $loglocator;
1137 sub freeze_if_want_freeze
1139 if ($main::please_freeze)
1141 release_allocation();
1144 # kill some srun procs before freeze+stop
1145 map { $proc{$_} = {} } @_;
1148 killem (keys %proc);
1149 select (undef, undef, undef, 0.1);
1151 while (($died = waitpid (-1, WNOHANG)) > 0)
1153 delete $proc{$died};
1168 Log (undef, "Freeze not implemented");
1172 Log (undef, "freeze");
1174 my $freezer = new Warehouse::Stream (whc => $whc);
1176 $freezer->name (".");
1177 $freezer->write_start ("state.txt");
1179 $freezer->write_data (join ("\n",
1183 $_ . "=" . freezequote($Job->{$_})
1184 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1186 foreach my $Jobstep (@jobstep)
1188 my $str = join ("\n",
1191 $_ . "=" . freezequote ($Jobstep->{$_})
1193 $_ !~ /^stderr|slotindex|node_fail/
1195 $freezer->write_data ($str."\n\n");
1197 if (@jobstep_tomerge)
1199 $freezer->write_data
1200 ("merge $jobstep_tomerge_level "
1201 . freezequote (join ("\n",
1202 map { freezequote ($_) } @jobstep_tomerge))
1206 $freezer->write_finish;
1207 my $frozentokey = $freezer->as_key;
1209 Log (undef, "frozento key is $frozentokey");
1210 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1211 $frozentokey, $job_id);
1212 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1213 Log (undef, "frozento+K key is $kfrozentokey");
1214 return $frozentokey;
1220 croak ("Thaw not implemented");
1224 Log (undef, "thaw from $key");
1229 @jobstep_tomerge = ();
1230 $jobstep_tomerge_level = 0;
1233 my $stream = new Warehouse::Stream ( whc => $whc,
1234 hash => [split (",", $key)] );
1236 while (my $dataref = $stream->read_until (undef, "\n\n"))
1238 if ($$dataref =~ /^job /)
1240 foreach (split ("\n", $$dataref))
1242 my ($k, $v) = split ("=", $_, 2);
1243 $frozenjob->{$k} = freezeunquote ($v);
1248 if ($$dataref =~ /^merge (\d+) (.*)/)
1250 $jobstep_tomerge_level = $1;
1252 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1257 foreach (split ("\n", $$dataref))
1259 my ($k, $v) = split ("=", $_, 2);
1260 $Jobstep->{$k} = freezeunquote ($v) if $k;
1262 $Jobstep->{attempts} = 0;
1263 push @jobstep, $Jobstep;
1265 if ($Jobstep->{exitcode} eq "0")
1267 push @jobstep_done, $#jobstep;
1271 push @jobstep_todo, $#jobstep;
1275 foreach (qw (script script_version script_parameters))
1277 $Job->{$_} = $frozenjob->{$_};
1295 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1302 my $srunargs = shift;
1303 my $execargs = shift;
1304 my $opts = shift || {};
1306 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1307 print STDERR (join (" ",
1308 map { / / ? "'$_'" : $_ }
1311 if $ENV{CRUNCH_DEBUG};
1313 if (defined $stdin) {
1314 my $child = open STDIN, "-|";
1315 defined $child or die "no fork: $!";
1317 print $stdin or die $!;
1318 close STDOUT or die $!;
1323 return system (@$args) if $opts->{fork};
1326 warn "ENV size is ".length(join(" ",%ENV));
1327 die "exec failed: $!: @$args";
1331 sub ban_node_by_slot {
1332 # Don't start any new jobsteps on this node for 60 seconds
1334 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1335 $slot[$slotid]->{node}->{hold_count}++;
1336 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1342 # checkout-and-build
1346 my $destdir = $ENV{"CRUNCH_SRC"};
1347 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1348 my $repo = $ENV{"CRUNCH_SRC_URL"};
1350 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1352 if (readlink ("$destdir.commit") eq $commit) {
1356 open STDOUT, ">", "$destdir.log";
1357 open STDERR, ">&STDOUT";
1359 if (-d "$destdir/.git") {
1360 chdir $destdir or die "chdir $destdir: $!";
1361 if (0 != system (qw(git remote set-url origin), $repo)) {
1362 # awful... for old versions of git that don't know "remote set-url"
1363 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1366 elsif ($repo && $commit)
1368 shell_or_die('git', 'clone', $repo, $destdir);
1369 chdir $destdir or die "chdir $destdir: $!";
1370 shell_or_die(qw(git config clean.requireForce false));
1373 die "$destdir does not exist, and no repo/commit specified -- giving up";
1377 unlink "$destdir.commit";
1378 shell_or_die (qw(git stash));
1379 shell_or_die (qw(git clean -d -x));
1380 shell_or_die (qw(git fetch origin));
1381 shell_or_die (qw(git checkout), $commit);
1385 chomp ($pwd = `pwd`);
1386 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1388 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1390 shell_or_die ("./tests/autotests.sh", $install_dir);
1391 } elsif (-e "./install.sh") {
1392 shell_or_die ("./install.sh", $install_dir);
1396 unlink "$destdir.commit.new";
1397 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1398 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1407 if ($ENV{"DEBUG"}) {
1408 print STDERR "@_\n";
1411 or die "@_ failed: $! exit 0x".sprintf("%x",$?);