2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
67 GetOptions('force-unlock' => \$force_unlock,
69 'resume-stash=s' => \$resume_stash,
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
82 $main::ENV{CRUNCH_DEBUG} = 1;
86 $main::ENV{CRUNCH_DEBUG} = 0;
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
94 $metastream->write_start('log.txt');
103 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104 $User = $arv->{'users'}->{'current'}->execute;
105 if (!$force_unlock) {
106 if ($Job->{'is_locked_by'}) {
107 croak("Job is locked: " . $Job->{'is_locked_by'});
109 if ($Job->{'success'} ne undef) {
110 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112 if ($Job->{'running'}) {
113 croak("Job 'running' flag is already set");
115 if ($Job->{'started_at'}) {
116 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
122 $Job = JSON::decode_json($jobspec);
126 map { croak ("No $_ specified") unless $Job->{$_} }
127 qw(script script_version script_parameters);
130 if (!defined $Job->{'uuid'}) {
131 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
134 $job_id = $Job->{'uuid'};
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
143 Log (undef, "check slurm allocation");
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
150 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151 push @sinfo, "$localcpus localhost";
153 if (exists $ENV{SLURM_NODELIST})
155 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
159 my ($ncpus, $slurm_nodelist) = split;
160 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
163 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
166 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
169 foreach (split (",", $ranges))
182 push @nodelist, map {
184 $n =~ s/\[[-,\d]+\]/$_/;
191 push @nodelist, $nodelist;
194 foreach my $nodename (@nodelist)
196 Log (undef, "node $nodename - $ncpus slots");
197 my $node = { name => $nodename,
201 foreach my $cpu (1..$ncpus)
203 push @slot, { node => $node,
207 push @node, @nodelist;
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
222 # Claim this job, and make sure nobody else does
224 $Job->{'is_locked_by'} = $User->{'uuid'};
225 $Job->{'started_at'} = time;
226 $Job->{'running'} = 1;
227 $Job->{'success'} = undef;
228 $Job->{'tasks_summary'} = { 'failed' => 0,
232 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233 croak("Error while updating / locking job");
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
266 if (defined $Job->{thawedfromkey})
268 thaw ($Job->{thawedfromkey});
272 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273 'job_uuid' => $Job->{'uuid'},
278 push @jobstep, { level => 0,
280 arvados_task => $first_task,
282 push @jobstep_todo, 0;
289 $build_script = <DATA>;
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
298 $ENV{"CRUNCH_SRC"} = $Job->{revision};
302 Log (undef, "Install revision ".$Job->{revision});
303 my $nodelist = join(",", @node);
305 # Clean out crunch_tmp/work and crunch_tmp/opt
307 my $cleanpid = fork();
310 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
316 last if $cleanpid == waitpid (-1, WNOHANG);
317 freeze_if_want_freeze ($cleanpid);
318 select (undef, undef, undef, 0.1);
320 Log (undef, "Clean-work-dir exited $?");
322 # Install requested code version
325 my @srunargs = ("srun",
326 "--nodelist=$nodelist",
327 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
334 my $treeish = $Job->{'script_version'};
335 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336 # Todo: let script_version specify alternate repo
337 $ENV{"CRUNCH_SRC_URL"} = $repo;
339 # Create/update our clone of the remote git repo
341 if (!-d $ENV{"CRUNCH_SRC"}) {
342 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343 or croak ("git clone $repo failed: exit ".($?>>8));
344 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348 # If this looks like a subversion r#, look for it in git-svn commit messages
350 if ($treeish =~ m{^\d{1,4}$}) {
351 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353 if ($gitlog =~ /^[a-f0-9]{40}$/) {
355 Log (undef, "Using commit $commit for revision $treeish");
359 # If that didn't work, try asking git to look it up as a tree-ish.
361 if (!defined $commit) {
363 my $cooked_treeish = $treeish;
364 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365 # Looks like a git branch name -- make sure git knows it's
366 # relative to the remote repo
367 $cooked_treeish = "origin/$treeish";
370 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372 if ($found =~ /^[0-9a-f]{40}$/s) {
374 if ($commit ne $treeish) {
375 # Make sure we record the real commit id in the database,
376 # frozentokey, logs, etc. -- instead of an abbreviation or a
377 # branch name which can become ambiguous or point to a
378 # different commit in the future.
379 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380 Log (undef, "Using commit $commit for tree-ish $treeish");
381 if ($commit ne $treeish) {
382 $Job->{'script_version'} = $commit;
383 $Job->save() or croak("Error while updating job");
389 if (defined $commit) {
390 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391 @execargs = ("sh", "-c",
392 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
395 croak ("could not figure out commit id for $treeish");
398 my $installpid = fork();
399 if ($installpid == 0)
401 srun (\@srunargs, \@execargs, {}, $build_script);
406 last if $installpid == waitpid (-1, WNOHANG);
407 freeze_if_want_freeze ($installpid);
408 select (undef, undef, undef, 0.1);
410 Log (undef, "Install exited $?");
415 foreach (qw (script script_version script_parameters resource_limits))
417 Log (undef, $_ . " " . $Job->{$_});
419 foreach (split (/\n/, $Job->{knobs}))
421 Log (undef, "knob " . $_);
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437 or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
444 my @freeslot = (0..$#slot);
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
450 update_progress_stats();
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 $main::please_continue = 0;
459 my $id = $jobstep_todo[$todo_ptr];
460 my $Jobstep = $jobstep[$id];
461 if ($Jobstep->{level} != $level)
465 if ($Jobstep->{attempts} > 9)
467 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
472 pipe $reader{$id}, "writer" or croak ($!);
473 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476 my $childslot = $freeslot[0];
477 my $childnode = $slot[$childslot]->{node};
478 my $childslotname = join (".",
479 $slot[$childslot]->{node}->{name},
480 $slot[$childslot]->{cpu});
481 my $childpid = fork();
484 $SIG{'INT'} = 'DEFAULT';
485 $SIG{'QUIT'} = 'DEFAULT';
486 $SIG{'TERM'} = 'DEFAULT';
488 foreach (values (%reader))
492 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493 open(STDOUT,">&writer");
494 open(STDERR,">&writer");
499 delete $ENV{"GNUPGHOME"};
500 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501 $ENV{"TASK_QSEQUENCE"} = $id;
502 $ENV{"TASK_SEQUENCE"} = $level;
503 $ENV{"JOB_SCRIPT"} = $Job->{script};
504 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505 $param =~ tr/a-z/A-Z/;
506 $ENV{"JOB_PARAMETER_$param"} = $value;
508 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
517 "--nodelist=".$childnode->{name},
518 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519 "--job-name=$job_id.$id.$$",
521 my @execargs = qw(sh);
522 my $build_script_to_send = "";
524 "mkdir -p $ENV{CRUNCH_TMP}/revision "
525 ."&& cd $ENV{CRUNCH_TMP} ";
528 $build_script_to_send = $build_script;
532 elsif (!$skip_install)
537 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540 ." && ./installrevision "
544 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547 my @execargs = ('bash', '-c', $command);
548 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
552 if (!defined $childpid)
559 $proc{$childpid} = { jobstep => $id,
562 jobstepname => "$job_id.$id.$childpid",
564 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565 $slot[$childslot]->{pid} = $childpid;
567 Log ($id, "child $childpid started on $childslotname");
568 $Jobstep->{attempts} ++;
569 $Jobstep->{starttime} = time;
570 $Jobstep->{node} = $childnode->{name};
571 $Jobstep->{slotindex} = $childslot;
572 delete $Jobstep->{stderr};
573 delete $Jobstep->{finishtime};
575 splice @jobstep_todo, $todo_ptr, 1;
578 $progress_is_dirty = 1;
582 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
584 last THISROUND if $main::please_freeze;
585 if ($main::please_info)
587 $main::please_info = 0;
591 update_progress_stats();
599 update_progress_stats();
600 select (undef, undef, undef, 0.1);
602 elsif (time - $progress_stats_updated >= 30)
604 update_progress_stats();
606 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
607 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
609 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
610 .($thisround_failed+$thisround_succeeded)
611 .") -- giving up on this round";
612 Log (undef, $message);
616 # move slots from freeslot to holdslot (or back to freeslot) if necessary
617 for (my $i=$#freeslot; $i>=0; $i--) {
618 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
619 push @holdslot, (splice @freeslot, $i, 1);
622 for (my $i=$#holdslot; $i>=0; $i--) {
623 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
624 push @freeslot, (splice @holdslot, $i, 1);
628 # give up if no nodes are succeeding
629 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
630 my $message = "Every node has failed -- giving up on this round";
631 Log (undef, $message);
638 push @freeslot, splice @holdslot;
639 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
642 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
645 goto THISROUND if $main::please_continue;
646 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
651 update_progress_stats();
652 select (undef, undef, undef, 0.1);
653 killem (keys %proc) if $main::please_freeze;
657 update_progress_stats();
658 freeze_if_want_freeze();
661 if (!defined $success)
664 $thisround_succeeded == 0 &&
665 ($thisround_failed == 0 || $thisround_failed > 4))
667 my $message = "stop because $thisround_failed tasks failed and none succeeded";
668 Log (undef, $message);
677 goto ONELEVEL if !defined $success;
680 release_allocation();
682 $Job->{'output'} = &collate_output();
683 $Job->{'success'} = 0 if !$Job->{'output'};
686 if ($Job->{'output'})
688 $arv->{'collections'}->{'create'}->execute('collection' => {
689 'uuid' => $Job->{'output'},
690 'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
695 Log (undef, "finish");
697 $Job->{'success'} = $Job->{'output'} && $success;
705 sub update_progress_stats
707 $progress_stats_updated = time;
708 return if !$progress_is_dirty;
709 my ($todo, $done, $running) = (scalar @jobstep_todo,
710 scalar @jobstep_done,
711 scalar @slot - scalar @freeslot - scalar @holdslot);
712 $Job->{'tasks_summary'} ||= {};
713 $Job->{'tasks_summary'}->{'todo'} = $todo;
714 $Job->{'tasks_summary'}->{'done'} = $done;
715 $Job->{'tasks_summary'}->{'running'} = $running;
717 Log (undef, "status: $done done, $running running, $todo todo");
718 $progress_is_dirty = 0;
725 my $pid = waitpid (-1, WNOHANG);
726 return 0 if $pid <= 0;
728 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
730 . $slot[$proc{$pid}->{slot}]->{cpu});
731 my $jobstepid = $proc{$pid}->{jobstep};
732 my $elapsed = time - $proc{$pid}->{time};
733 my $Jobstep = $jobstep[$jobstepid];
736 my $exitinfo = "exit $exitcode";
737 $Jobstep->{arvados_task}->reload;
738 my $success = $Jobstep->{arvados_task}->{success};
740 if (!defined $success) {
741 # task did not indicate one way or the other --> fail
742 $Jobstep->{arvados_task}->{success} = 0;
743 $Jobstep->{arvados_task}->save;
747 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
751 --$Jobstep->{attempts} if $Jobstep->{node_fail};
753 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
755 # Check for signs of a failed or misconfigured node
756 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
757 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
758 # Don't count this against jobstep failure thresholds if this
759 # node is already suspected faulty and srun exited quickly
760 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
762 $Jobstep->{attempts} > 1) {
763 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
764 --$Jobstep->{attempts};
766 ban_node_by_slot($proc{$pid}->{slot});
769 push @jobstep_todo, $jobstepid;
770 Log ($jobstepid, "failure in $elapsed seconds");
771 $Job->{'tasks_summary'}->{'failed'}++;
775 ++$thisround_succeeded;
776 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
777 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
778 push @jobstep_done, $jobstepid;
779 Log ($jobstepid, "success in $elapsed seconds");
781 $Jobstep->{exitcode} = $exitcode;
782 $Jobstep->{finishtime} = time;
783 process_stderr ($jobstepid, $success);
784 Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
786 close $reader{$jobstepid};
787 delete $reader{$jobstepid};
788 delete $slot[$proc{$pid}->{slot}]->{pid};
789 push @freeslot, $proc{$pid}->{slot};
793 my $newtask_list = $arv->{'job_tasks'}->{'index'}->execute('where' => {
794 'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
796 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
798 'level' => $arvados_task->{'sequence'},
800 'arvados_task' => $arvados_task
802 push @jobstep, $jobstep;
803 push @jobstep_todo, $#jobstep;
806 $progress_is_dirty = 1;
813 # return if the kill list was checked <4 seconds ago
814 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
818 $squeue_kill_checked = time;
820 # use killem() on procs whose killtime is reached
823 if (exists $proc{$_}->{killtime}
824 && $proc{$_}->{killtime} <= time)
830 # return if the squeue was checked <60 seconds ago
831 if (defined $squeue_checked && $squeue_checked > time - 60)
835 $squeue_checked = time;
839 # here is an opportunity to check for mysterious problems with local procs
843 # get a list of steps still running
844 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
846 if ($squeue[-1] ne "ok")
852 # which of my jobsteps are running, according to squeue?
856 if (/^(\d+)\.(\d+) (\S+)/)
858 if ($1 eq $ENV{SLURM_JOBID})
865 # which of my active child procs (>60s old) were not mentioned by squeue?
868 if ($proc{$_}->{time} < time - 60
869 && !exists $ok{$proc{$_}->{jobstepname}}
870 && !exists $proc{$_}->{killtime})
872 # kill this proc if it hasn't exited in 30 seconds
873 $proc{$_}->{killtime} = time + 30;
879 sub release_allocation
883 Log (undef, "release job allocation");
884 system "scancel $ENV{SLURM_JOBID}";
892 foreach my $job (keys %reader)
895 while (0 < sysread ($reader{$job}, $buf, 8192))
897 print STDERR $buf if $ENV{CRUNCH_DEBUG};
898 $jobstep[$job]->{stderr} .= $buf;
899 preprocess_stderr ($job);
900 if (length ($jobstep[$job]->{stderr}) > 16384)
902 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
911 sub preprocess_stderr
915 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
917 if ($line =~ /\+\+\+mr/) {
920 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
921 Log ($job, "stderr $line");
922 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
924 $main::please_freeze = 1;
926 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
927 $jobstep[$job]->{node_fail} = 1;
928 ban_node_by_slot($jobstep[$job]->{slotindex});
938 preprocess_stderr ($job);
941 Log ($job, "stderr $_");
942 } split ("\n", $jobstep[$job]->{stderr});
948 my $whc = Warehouse->new;
949 Log (undef, "collate");
950 $whc->write_start (1);
954 next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
955 my $output = $_->{arvados_task}->{output};
956 if ($output !~ /^[0-9a-f]{32}/)
958 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
959 $whc->write_data ($output);
961 elsif (@jobstep == 1)
963 $joboutput = $output;
966 elsif (defined (my $outblock = $whc->fetch_block ($output)))
968 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
969 $whc->write_data ($outblock);
973 my $errstr = $whc->errstr;
974 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
978 $joboutput = $whc->write_finish if !defined $joboutput;
981 Log (undef, "outputkey $joboutput");
982 $Job->{'output'} = $joboutput;
987 Log (undef, "outputkey undef");
997 my $sig = 2; # SIGINT first
998 if (exists $proc{$_}->{"sent_$sig"} &&
999 time - $proc{$_}->{"sent_$sig"} > 4)
1001 $sig = 15; # SIGTERM if SIGINT doesn't work
1003 if (exists $proc{$_}->{"sent_$sig"} &&
1004 time - $proc{$_}->{"sent_$sig"} > 4)
1006 $sig = 9; # SIGKILL if SIGTERM doesn't work
1008 if (!exists $proc{$_}->{"sent_$sig"})
1010 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1012 select (undef, undef, undef, 0.1);
1015 kill $sig, $_; # srun wants two SIGINT to really interrupt
1017 $proc{$_}->{"sent_$sig"} = time;
1018 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1028 vec($bits,fileno($_),1) = 1;
1034 sub Log # ($jobstep_id, $logmessage)
1036 if ($_[1] =~ /\n/) {
1037 for my $line (split (/\n/, $_[1])) {
1042 my $fh = select STDERR; $|=1; select $fh;
1043 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1044 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1047 if ($metastream || -t STDERR) {
1048 my @gmtime = gmtime;
1049 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1050 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1052 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1054 return if !$metastream;
1055 $metastream->write_data ($datetime . " " . $message);
1059 sub reconnect_database
1061 return if !$job_has_uuid;
1062 return if ($dbh && $dbh->do ("select now()"));
1065 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1067 $dbh->{InactiveDestroy} = 1;
1070 warn ($DBI::errstr);
1073 croak ($DBI::errstr) if !$dbh;
1079 return 1 if !$job_has_uuid;
1080 my $ret = $dbh->do (@_);
1081 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1082 reconnect_database();
1083 return $dbh->do (@_);
1089 my ($package, $file, $line) = caller;
1090 my $message = "@_ at $file line $line\n";
1091 Log (undef, $message);
1092 freeze() if @jobstep_todo;
1093 collate_output() if @jobstep_todo;
1095 save_meta() if $metastream;
1102 return if !$job_has_uuid || !$dbh;
1104 reconnect_database();
1106 $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
1107 $sth->execute ($jobmanager_id);
1108 $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
1109 $sth->execute ($job_id, $jobmanager_id);
1115 my $justcheckpoint = shift; # false if this will be the last meta saved
1116 my $m = $metastream;
1117 $m = $m->copy if $justcheckpoint;
1119 my $loglocator = $m->as_key;
1120 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1121 Log (undef, "meta key is $loglocator");
1122 $Job->{'log'} = $loglocator;
1127 sub freeze_if_want_freeze
1129 if ($main::please_freeze)
1131 release_allocation();
1134 # kill some srun procs before freeze+stop
1135 map { $proc{$_} = {} } @_;
1138 killem (keys %proc);
1139 select (undef, undef, undef, 0.1);
1141 while (($died = waitpid (-1, WNOHANG)) > 0)
1143 delete $proc{$died};
1158 Log (undef, "Freeze not implemented");
1162 Log (undef, "freeze");
1164 my $freezer = new Warehouse::Stream (whc => $whc);
1166 $freezer->name (".");
1167 $freezer->write_start ("state.txt");
1169 $freezer->write_data (join ("\n",
1173 $_ . "=" . freezequote($Job->{$_})
1174 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1176 foreach my $Jobstep (@jobstep)
1178 my $str = join ("\n",
1181 $_ . "=" . freezequote ($Jobstep->{$_})
1183 $_ !~ /^stderr|slotindex|node_fail/
1185 $freezer->write_data ($str."\n\n");
1187 if (@jobstep_tomerge)
1189 $freezer->write_data
1190 ("merge $jobstep_tomerge_level "
1191 . freezequote (join ("\n",
1192 map { freezequote ($_) } @jobstep_tomerge))
1196 $freezer->write_finish;
1197 my $frozentokey = $freezer->as_key;
1199 Log (undef, "frozento key is $frozentokey");
1200 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1201 $frozentokey, $job_id);
1202 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1203 Log (undef, "frozento+K key is $kfrozentokey");
1204 return $frozentokey;
1210 croak ("Thaw not implemented");
1214 Log (undef, "thaw from $key");
1219 @jobstep_tomerge = ();
1220 $jobstep_tomerge_level = 0;
1223 my $stream = new Warehouse::Stream ( whc => $whc,
1224 hash => [split (",", $key)] );
1226 while (my $dataref = $stream->read_until (undef, "\n\n"))
1228 if ($$dataref =~ /^job /)
1230 foreach (split ("\n", $$dataref))
1232 my ($k, $v) = split ("=", $_, 2);
1233 $frozenjob->{$k} = freezeunquote ($v);
1238 if ($$dataref =~ /^merge (\d+) (.*)/)
1240 $jobstep_tomerge_level = $1;
1242 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1247 foreach (split ("\n", $$dataref))
1249 my ($k, $v) = split ("=", $_, 2);
1250 $Jobstep->{$k} = freezeunquote ($v) if $k;
1252 $Jobstep->{attempts} = 0;
1253 push @jobstep, $Jobstep;
1255 if ($Jobstep->{exitcode} eq "0")
1257 push @jobstep_done, $#jobstep;
1261 push @jobstep_todo, $#jobstep;
1265 foreach (qw (script script_version script_parameters))
1267 $Job->{$_} = $frozenjob->{$_};
1285 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1292 my $srunargs = shift;
1293 my $execargs = shift;
1294 my $opts = shift || {};
1296 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1297 print STDERR (join (" ",
1298 map { / / ? "'$_'" : $_ }
1301 if $ENV{CRUNCH_DEBUG};
1303 if (defined $stdin) {
1304 my $child = open STDIN, "-|";
1305 defined $child or die "no fork: $!";
1307 print $stdin or die $!;
1308 close STDOUT or die $!;
1313 return system (@$args) if $opts->{fork};
1316 warn "ENV size is ".length(join(" ",%ENV));
1317 die "exec failed: $!: @$args";
1321 sub ban_node_by_slot {
1322 # Don't start any new jobsteps on this node for 60 seconds
1324 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1325 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1331 # checkout-and-build
1335 my $destdir = $ENV{"CRUNCH_SRC"};
1336 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1337 my $repo = $ENV{"CRUNCH_SRC_URL"};
1339 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1341 if (readlink ("$destdir.commit") eq $commit) {
1345 open STDOUT, ">", "$destdir.log";
1346 open STDERR, ">&STDOUT";
1348 if (-d "$destdir/.git") {
1349 chdir $destdir or die "chdir $destdir: $!";
1350 if (0 != system (qw(git remote set-url origin), $repo)) {
1351 # awful... for old versions of git that don't know "remote set-url"
1352 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1355 elsif ($repo && $commit)
1357 shell_or_die('git', 'clone', $repo, $destdir);
1358 chdir $destdir or die "chdir $destdir: $!";
1359 shell_or_die(qw(git config clean.requireForce false));
1362 die "$destdir does not exist, and no repo/commit specified -- giving up";
1366 unlink "$destdir.commit";
1367 shell_or_die (qw(git stash));
1368 shell_or_die (qw(git clean -d -x));
1369 shell_or_die (qw(git fetch origin));
1370 shell_or_die (qw(git checkout), $commit);
1374 chomp ($pwd = `pwd`);
1375 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1377 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1379 shell_or_die ("./tests/autotests.sh", $install_dir);
1380 } elsif (-e "./install.sh") {
1381 shell_or_die ("./install.sh", $install_dir);
1385 unlink "$destdir.commit.new";
1386 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1387 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1396 if ($ENV{"DEBUG"}) {
1397 print STDERR "@_\n";
1400 or die "@_ failed: $! exit 0x".sprintf("%x",$?);