2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
67 GetOptions('force-unlock' => \$force_unlock,
69 'resume-stash=s' => \$resume_stash,
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
82 $main::ENV{CRUNCH_DEBUG} = 1;
86 $main::ENV{CRUNCH_DEBUG} = 0;
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
94 $metastream->write_start('log.txt');
103 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104 $User = $arv->{'users'}->{'current'}->execute;
105 if (!$force_unlock) {
106 if ($Job->{'is_locked_by'}) {
107 croak("Job is locked: " . $Job->{'is_locked_by'});
109 if ($Job->{'success'} ne undef) {
110 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112 if ($Job->{'running'}) {
113 croak("Job 'running' flag is already set");
115 if ($Job->{'started_at'}) {
116 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
122 $Job = JSON::decode_json($jobspec);
126 map { croak ("No $_ specified") unless $Job->{$_} }
127 qw(script script_version script_parameters);
130 if (!defined $Job->{'uuid'}) {
131 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
134 $job_id = $Job->{'uuid'};
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
143 Log (undef, "check slurm allocation");
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
150 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151 push @sinfo, "$localcpus localhost";
153 if (exists $ENV{SLURM_NODELIST})
155 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
159 my ($ncpus, $slurm_nodelist) = split;
160 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
163 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
166 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
169 foreach (split (",", $ranges))
182 push @nodelist, map {
184 $n =~ s/\[[-,\d]+\]/$_/;
191 push @nodelist, $nodelist;
194 foreach my $nodename (@nodelist)
196 Log (undef, "node $nodename - $ncpus slots");
197 my $node = { name => $nodename,
201 foreach my $cpu (1..$ncpus)
203 push @slot, { node => $node,
207 push @node, @nodelist;
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
222 # Claim this job, and make sure nobody else does
224 $Job->{'is_locked_by'} = $User->{'uuid'};
225 $Job->{'started_at'} = time;
226 $Job->{'running'} = 1;
227 $Job->{'success'} = undef;
228 $Job->{'tasks_summary'} = { 'failed' => 0,
232 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233 croak("Error while updating / locking job");
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
266 if (defined $Job->{thawedfromkey})
268 thaw ($Job->{thawedfromkey});
272 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273 'job_uuid' => $Job->{'uuid'},
278 push @jobstep, { level => 0,
280 arvados_task => $first_task,
282 push @jobstep_todo, 0;
289 $build_script = <DATA>;
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
298 $ENV{"CRUNCH_SRC"} = $Job->{revision};
302 Log (undef, "Install revision ".$Job->{revision});
303 my $nodelist = join(",", @node);
305 # Clean out crunch_tmp/work and crunch_tmp/opt
307 my $cleanpid = fork();
310 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
316 last if $cleanpid == waitpid (-1, WNOHANG);
317 freeze_if_want_freeze ($cleanpid);
318 select (undef, undef, undef, 0.1);
320 Log (undef, "Clean-work-dir exited $?");
322 # Install requested code version
325 my @srunargs = ("srun",
326 "--nodelist=$nodelist",
327 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
334 my $treeish = $Job->{'script_version'};
335 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336 # Todo: let script_version specify alternate repo
337 $ENV{"CRUNCH_SRC_URL"} = $repo;
339 # Create/update our clone of the remote git repo
341 if (!-d $ENV{"CRUNCH_SRC"}) {
342 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343 or croak ("git clone $repo failed: exit ".($?>>8));
344 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348 # If this looks like a subversion r#, look for it in git-svn commit messages
350 if ($treeish =~ m{^\d{1,4}$}) {
351 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353 if ($gitlog =~ /^[a-f0-9]{40}$/) {
355 Log (undef, "Using commit $commit for revision $treeish");
359 # If that didn't work, try asking git to look it up as a tree-ish.
361 if (!defined $commit) {
363 my $cooked_treeish = $treeish;
364 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365 # Looks like a git branch name -- make sure git knows it's
366 # relative to the remote repo
367 $cooked_treeish = "origin/$treeish";
370 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372 if ($found =~ /^[0-9a-f]{40}$/s) {
374 if ($commit ne $treeish) {
375 # Make sure we record the real commit id in the database,
376 # frozentokey, logs, etc. -- instead of an abbreviation or a
377 # branch name which can become ambiguous or point to a
378 # different commit in the future.
379 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380 Log (undef, "Using commit $commit for tree-ish $treeish");
381 if ($commit ne $treeish) {
382 $Job->{'script_version'} = $commit;
383 $Job->save() or croak("Error while updating job");
389 if (defined $commit) {
390 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391 @execargs = ("sh", "-c",
392 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
395 croak ("could not figure out commit id for $treeish");
398 my $installpid = fork();
399 if ($installpid == 0)
401 srun (\@srunargs, \@execargs, {}, $build_script);
406 last if $installpid == waitpid (-1, WNOHANG);
407 freeze_if_want_freeze ($installpid);
408 select (undef, undef, undef, 0.1);
410 Log (undef, "Install exited $?");
415 foreach (qw (script script_version script_parameters resource_limits))
417 Log (undef, $_ . " " . $Job->{$_});
419 foreach (split (/\n/, $Job->{knobs}))
421 Log (undef, "knob " . $_);
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437 or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
444 my @freeslot = (0..$#slot);
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
450 update_progress_stats();
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 $main::please_continue = 0;
459 my $id = $jobstep_todo[$todo_ptr];
460 my $Jobstep = $jobstep[$id];
461 if ($Jobstep->{level} != $level)
465 if ($Jobstep->{attempts} > 9)
467 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
472 pipe $reader{$id}, "writer" or croak ($!);
473 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476 my $childslot = $freeslot[0];
477 my $childnode = $slot[$childslot]->{node};
478 my $childslotname = join (".",
479 $slot[$childslot]->{node}->{name},
480 $slot[$childslot]->{cpu});
481 my $childpid = fork();
484 $SIG{'INT'} = 'DEFAULT';
485 $SIG{'QUIT'} = 'DEFAULT';
486 $SIG{'TERM'} = 'DEFAULT';
488 foreach (values (%reader))
492 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493 open(STDOUT,">&writer");
494 open(STDERR,">&writer");
499 delete $ENV{"GNUPGHOME"};
500 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501 $ENV{"TASK_QSEQUENCE"} = $id;
502 $ENV{"TASK_SEQUENCE"} = $level;
503 $ENV{"JOB_SCRIPT"} = $Job->{script};
504 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505 $param =~ tr/a-z/A-Z/;
506 $ENV{"JOB_PARAMETER_$param"} = $value;
508 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
517 "--nodelist=".$childnode->{name},
518 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519 "--job-name=$job_id.$id.$$",
521 my @execargs = qw(sh);
522 my $build_script_to_send = "";
524 "mkdir -p $ENV{CRUNCH_TMP}/revision "
525 ."&& cd $ENV{CRUNCH_TMP} ";
528 $build_script_to_send = $build_script;
532 elsif (!$skip_install)
537 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540 ." && ./installrevision "
544 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547 my @execargs = ('bash', '-c', $command);
548 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
552 if (!defined $childpid)
559 $proc{$childpid} = { jobstep => $id,
562 jobstepname => "$job_id.$id.$childpid",
564 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565 $slot[$childslot]->{pid} = $childpid;
567 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568 Log ($id, "child $childpid started on $childslotname");
569 $Jobstep->{attempts} ++;
570 $Jobstep->{starttime} = time;
571 $Jobstep->{node} = $childnode->{name};
572 $Jobstep->{slotindex} = $childslot;
573 delete $Jobstep->{stderr};
574 delete $Jobstep->{finishtime};
576 splice @jobstep_todo, $todo_ptr, 1;
579 $progress_is_dirty = 1;
583 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
585 last THISROUND if $main::please_freeze;
586 if ($main::please_info)
588 $main::please_info = 0;
592 update_progress_stats();
600 update_progress_stats();
601 select (undef, undef, undef, 0.1);
603 elsif (time - $progress_stats_updated >= 30)
605 update_progress_stats();
607 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
610 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611 .($thisround_failed+$thisround_succeeded)
612 .") -- giving up on this round";
613 Log (undef, $message);
617 # move slots from freeslot to holdslot (or back to freeslot) if necessary
618 for (my $i=$#freeslot; $i>=0; $i--) {
619 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620 push @holdslot, (splice @freeslot, $i, 1);
623 for (my $i=$#holdslot; $i>=0; $i--) {
624 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625 push @freeslot, (splice @holdslot, $i, 1);
629 # give up if no nodes are succeeding
630 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631 my $message = "Every node has failed -- giving up on this round";
632 Log (undef, $message);
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
646 goto THISROUND if $main::please_continue;
647 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
652 update_progress_stats();
653 select (undef, undef, undef, 0.1);
654 killem (keys %proc) if $main::please_freeze;
658 update_progress_stats();
659 freeze_if_want_freeze();
662 if (!defined $success)
665 $thisround_succeeded == 0 &&
666 ($thisround_failed == 0 || $thisround_failed > 4))
668 my $message = "stop because $thisround_failed tasks failed and none succeeded";
669 Log (undef, $message);
678 goto ONELEVEL if !defined $success;
681 release_allocation();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = 0 if !$Job->{'output'};
687 if ($Job->{'output'})
689 $arv->{'collections'}->{'create'}->execute('collection' => {
690 'uuid' => $Job->{'output'},
691 'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
696 Log (undef, "finish");
698 $Job->{'success'} = $Job->{'output'} && $success;
706 sub update_progress_stats
708 $progress_stats_updated = time;
709 return if !$progress_is_dirty;
710 my ($todo, $done, $running) = (scalar @jobstep_todo,
711 scalar @jobstep_done,
712 scalar @slot - scalar @freeslot - scalar @holdslot);
713 $Job->{'tasks_summary'} ||= {};
714 $Job->{'tasks_summary'}->{'todo'} = $todo;
715 $Job->{'tasks_summary'}->{'done'} = $done;
716 $Job->{'tasks_summary'}->{'running'} = $running;
718 Log (undef, "status: $done done, $running running, $todo todo");
719 $progress_is_dirty = 0;
726 my $pid = waitpid (-1, WNOHANG);
727 return 0 if $pid <= 0;
729 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
731 . $slot[$proc{$pid}->{slot}]->{cpu});
732 my $jobstepid = $proc{$pid}->{jobstep};
733 my $elapsed = time - $proc{$pid}->{time};
734 my $Jobstep = $jobstep[$jobstepid];
737 my $exitinfo = "exit $exitcode";
738 $Jobstep->{arvados_task}->reload;
739 my $success = $Jobstep->{arvados_task}->{success};
741 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
743 if (!defined $success) {
744 # task did not indicate one way or the other --> fail
745 $Jobstep->{arvados_task}->{success} = 0;
746 $Jobstep->{arvados_task}->save;
752 --$Jobstep->{attempts} if $Jobstep->{node_fail};
754 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
756 # Check for signs of a failed or misconfigured node
757 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
758 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
759 # Don't count this against jobstep failure thresholds if this
760 # node is already suspected faulty and srun exited quickly
761 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
763 $Jobstep->{attempts} > 1) {
764 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
765 --$Jobstep->{attempts};
767 ban_node_by_slot($proc{$pid}->{slot});
770 push @jobstep_todo, $jobstepid;
771 Log ($jobstepid, "failure in $elapsed seconds");
772 $Job->{'tasks_summary'}->{'failed'}++;
776 ++$thisround_succeeded;
777 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
778 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
779 push @jobstep_done, $jobstepid;
780 Log ($jobstepid, "success in $elapsed seconds");
782 $Jobstep->{exitcode} = $exitcode;
783 $Jobstep->{finishtime} = time;
784 process_stderr ($jobstepid, $success);
785 Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
787 close $reader{$jobstepid};
788 delete $reader{$jobstepid};
789 delete $slot[$proc{$pid}->{slot}]->{pid};
790 push @freeslot, $proc{$pid}->{slot};
794 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute('where' => {
795 'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
797 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
799 'level' => $arvados_task->{'sequence'},
801 'arvados_task' => $arvados_task
803 push @jobstep, $jobstep;
804 push @jobstep_todo, $#jobstep;
807 $progress_is_dirty = 1;
814 # return if the kill list was checked <4 seconds ago
815 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
819 $squeue_kill_checked = time;
821 # use killem() on procs whose killtime is reached
824 if (exists $proc{$_}->{killtime}
825 && $proc{$_}->{killtime} <= time)
831 # return if the squeue was checked <60 seconds ago
832 if (defined $squeue_checked && $squeue_checked > time - 60)
836 $squeue_checked = time;
840 # here is an opportunity to check for mysterious problems with local procs
844 # get a list of steps still running
845 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
847 if ($squeue[-1] ne "ok")
853 # which of my jobsteps are running, according to squeue?
857 if (/^(\d+)\.(\d+) (\S+)/)
859 if ($1 eq $ENV{SLURM_JOBID})
866 # which of my active child procs (>60s old) were not mentioned by squeue?
869 if ($proc{$_}->{time} < time - 60
870 && !exists $ok{$proc{$_}->{jobstepname}}
871 && !exists $proc{$_}->{killtime})
873 # kill this proc if it hasn't exited in 30 seconds
874 $proc{$_}->{killtime} = time + 30;
880 sub release_allocation
884 Log (undef, "release job allocation");
885 system "scancel $ENV{SLURM_JOBID}";
893 foreach my $job (keys %reader)
896 while (0 < sysread ($reader{$job}, $buf, 8192))
898 print STDERR $buf if $ENV{CRUNCH_DEBUG};
899 $jobstep[$job]->{stderr} .= $buf;
900 preprocess_stderr ($job);
901 if (length ($jobstep[$job]->{stderr}) > 16384)
903 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
912 sub preprocess_stderr
916 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
918 if ($line =~ /\+\+\+mr/) {
921 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
922 Log ($job, "stderr $line");
923 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
925 $main::please_freeze = 1;
927 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
928 $jobstep[$job]->{node_fail} = 1;
929 ban_node_by_slot($jobstep[$job]->{slotindex});
939 preprocess_stderr ($job);
942 Log ($job, "stderr $_");
943 } split ("\n", $jobstep[$job]->{stderr});
949 my $whc = Warehouse->new;
950 Log (undef, "collate");
951 $whc->write_start (1);
955 next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
956 my $output = $_->{arvados_task}->{output};
957 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
959 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
960 $whc->write_data ($output);
962 elsif (@jobstep == 1)
964 $joboutput = $output;
967 elsif (defined (my $outblock = $whc->fetch_block ($output)))
969 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
970 $whc->write_data ($outblock);
974 my $errstr = $whc->errstr;
975 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
979 $joboutput = $whc->write_finish if !defined $joboutput;
982 Log (undef, "output $joboutput");
983 $Job->{'output'} = $joboutput;
988 Log (undef, "output undef");
998 my $sig = 2; # SIGINT first
999 if (exists $proc{$_}->{"sent_$sig"} &&
1000 time - $proc{$_}->{"sent_$sig"} > 4)
1002 $sig = 15; # SIGTERM if SIGINT doesn't work
1004 if (exists $proc{$_}->{"sent_$sig"} &&
1005 time - $proc{$_}->{"sent_$sig"} > 4)
1007 $sig = 9; # SIGKILL if SIGTERM doesn't work
1009 if (!exists $proc{$_}->{"sent_$sig"})
1011 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1013 select (undef, undef, undef, 0.1);
1016 kill $sig, $_; # srun wants two SIGINT to really interrupt
1018 $proc{$_}->{"sent_$sig"} = time;
1019 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1029 vec($bits,fileno($_),1) = 1;
1035 sub Log # ($jobstep_id, $logmessage)
1037 if ($_[1] =~ /\n/) {
1038 for my $line (split (/\n/, $_[1])) {
1043 my $fh = select STDERR; $|=1; select $fh;
1044 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1045 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1048 if ($metastream || -t STDERR) {
1049 my @gmtime = gmtime;
1050 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1051 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1053 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1055 return if !$metastream;
1056 $metastream->write_data ($datetime . " " . $message);
1060 sub reconnect_database
1062 return if !$job_has_uuid;
1063 return if ($dbh && $dbh->do ("select now()"));
1066 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1068 $dbh->{InactiveDestroy} = 1;
1071 warn ($DBI::errstr);
1074 croak ($DBI::errstr) if !$dbh;
1080 return 1 if !$job_has_uuid;
1081 my $ret = $dbh->do (@_);
1082 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1083 reconnect_database();
1084 return $dbh->do (@_);
1090 my ($package, $file, $line) = caller;
1091 my $message = "@_ at $file line $line\n";
1092 Log (undef, $message);
1093 freeze() if @jobstep_todo;
1094 collate_output() if @jobstep_todo;
1096 save_meta() if $metastream;
1103 return if !$job_has_uuid;
1105 $Job->{'running'} = 0;
1106 $Job->{'success'} = 0;
1107 $Job->{'finished_at'} = time;
1114 my $justcheckpoint = shift; # false if this will be the last meta saved
1115 my $m = $metastream;
1116 $m = $m->copy if $justcheckpoint;
1118 my $loglocator = $m->as_key;
1119 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1120 Log (undef, "meta key is $loglocator");
1121 $Job->{'log'} = $loglocator;
1126 sub freeze_if_want_freeze
1128 if ($main::please_freeze)
1130 release_allocation();
1133 # kill some srun procs before freeze+stop
1134 map { $proc{$_} = {} } @_;
1137 killem (keys %proc);
1138 select (undef, undef, undef, 0.1);
1140 while (($died = waitpid (-1, WNOHANG)) > 0)
1142 delete $proc{$died};
1157 Log (undef, "Freeze not implemented");
1161 Log (undef, "freeze");
1163 my $freezer = new Warehouse::Stream (whc => $whc);
1165 $freezer->name (".");
1166 $freezer->write_start ("state.txt");
1168 $freezer->write_data (join ("\n",
1172 $_ . "=" . freezequote($Job->{$_})
1173 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1175 foreach my $Jobstep (@jobstep)
1177 my $str = join ("\n",
1180 $_ . "=" . freezequote ($Jobstep->{$_})
1182 $_ !~ /^stderr|slotindex|node_fail/
1184 $freezer->write_data ($str."\n\n");
1186 if (@jobstep_tomerge)
1188 $freezer->write_data
1189 ("merge $jobstep_tomerge_level "
1190 . freezequote (join ("\n",
1191 map { freezequote ($_) } @jobstep_tomerge))
1195 $freezer->write_finish;
1196 my $frozentokey = $freezer->as_key;
1198 Log (undef, "frozento key is $frozentokey");
1199 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1200 $frozentokey, $job_id);
1201 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1202 Log (undef, "frozento+K key is $kfrozentokey");
1203 return $frozentokey;
1209 croak ("Thaw not implemented");
1213 Log (undef, "thaw from $key");
1218 @jobstep_tomerge = ();
1219 $jobstep_tomerge_level = 0;
1222 my $stream = new Warehouse::Stream ( whc => $whc,
1223 hash => [split (",", $key)] );
1225 while (my $dataref = $stream->read_until (undef, "\n\n"))
1227 if ($$dataref =~ /^job /)
1229 foreach (split ("\n", $$dataref))
1231 my ($k, $v) = split ("=", $_, 2);
1232 $frozenjob->{$k} = freezeunquote ($v);
1237 if ($$dataref =~ /^merge (\d+) (.*)/)
1239 $jobstep_tomerge_level = $1;
1241 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1246 foreach (split ("\n", $$dataref))
1248 my ($k, $v) = split ("=", $_, 2);
1249 $Jobstep->{$k} = freezeunquote ($v) if $k;
1251 $Jobstep->{attempts} = 0;
1252 push @jobstep, $Jobstep;
1254 if ($Jobstep->{exitcode} eq "0")
1256 push @jobstep_done, $#jobstep;
1260 push @jobstep_todo, $#jobstep;
1264 foreach (qw (script script_version script_parameters))
1266 $Job->{$_} = $frozenjob->{$_};
1284 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1291 my $srunargs = shift;
1292 my $execargs = shift;
1293 my $opts = shift || {};
1295 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1296 print STDERR (join (" ",
1297 map { / / ? "'$_'" : $_ }
1300 if $ENV{CRUNCH_DEBUG};
1302 if (defined $stdin) {
1303 my $child = open STDIN, "-|";
1304 defined $child or die "no fork: $!";
1306 print $stdin or die $!;
1307 close STDOUT or die $!;
1312 return system (@$args) if $opts->{fork};
1315 warn "ENV size is ".length(join(" ",%ENV));
1316 die "exec failed: $!: @$args";
1320 sub ban_node_by_slot {
1321 # Don't start any new jobsteps on this node for 60 seconds
1323 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1324 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1330 # checkout-and-build
1334 my $destdir = $ENV{"CRUNCH_SRC"};
1335 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1336 my $repo = $ENV{"CRUNCH_SRC_URL"};
1338 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1340 if (readlink ("$destdir.commit") eq $commit) {
1344 open STDOUT, ">", "$destdir.log";
1345 open STDERR, ">&STDOUT";
1347 if (-d "$destdir/.git") {
1348 chdir $destdir or die "chdir $destdir: $!";
1349 if (0 != system (qw(git remote set-url origin), $repo)) {
1350 # awful... for old versions of git that don't know "remote set-url"
1351 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1354 elsif ($repo && $commit)
1356 shell_or_die('git', 'clone', $repo, $destdir);
1357 chdir $destdir or die "chdir $destdir: $!";
1358 shell_or_die(qw(git config clean.requireForce false));
1361 die "$destdir does not exist, and no repo/commit specified -- giving up";
1365 unlink "$destdir.commit";
1366 shell_or_die (qw(git stash));
1367 shell_or_die (qw(git clean -d -x));
1368 shell_or_die (qw(git fetch origin));
1369 shell_or_die (qw(git checkout), $commit);
1373 chomp ($pwd = `pwd`);
1374 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1376 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1378 shell_or_die ("./tests/autotests.sh", $install_dir);
1379 } elsif (-e "./install.sh") {
1380 shell_or_die ("./install.sh", $install_dir);
1384 unlink "$destdir.commit.new";
1385 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1386 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1395 if ($ENV{"DEBUG"}) {
1396 print STDERR "@_\n";
1399 or die "@_ failed: $! exit 0x".sprintf("%x",$?);