2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
68 GetOptions('force-unlock' => \$force_unlock,
70 'resume-stash=s' => \$resume_stash,
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
83 $main::ENV{CRUNCH_DEBUG} = 1;
87 $main::ENV{CRUNCH_DEBUG} = 0;
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
95 $metastream->write_start('log.txt');
104 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105 $User = $arv->{'users'}->{'current'}->execute;
106 if (!$force_unlock) {
107 if ($Job->{'is_locked_by'}) {
108 croak("Job is locked: " . $Job->{'is_locked_by'});
110 if ($Job->{'success'} ne undef) {
111 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
113 if ($Job->{'running'}) {
114 croak("Job 'running' flag is already set");
116 if ($Job->{'started_at'}) {
117 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
123 $Job = JSON::decode_json($jobspec);
127 map { croak ("No $_ specified") unless $Job->{$_} }
128 qw(script script_version script_parameters);
131 if (!defined $Job->{'uuid'}) {
132 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
135 $job_id = $Job->{'uuid'};
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
144 Log (undef, "check slurm allocation");
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
151 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152 push @sinfo, "$localcpus localhost";
154 if (exists $ENV{SLURM_NODELIST})
156 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
160 my ($ncpus, $slurm_nodelist) = split;
161 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
164 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
167 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
170 foreach (split (",", $ranges))
183 push @nodelist, map {
185 $n =~ s/\[[-,\d]+\]/$_/;
192 push @nodelist, $nodelist;
195 foreach my $nodename (@nodelist)
197 Log (undef, "node $nodename - $ncpus slots");
198 my $node = { name => $nodename,
202 foreach my $cpu (1..$ncpus)
204 push @slot, { node => $node,
208 push @node, @nodelist;
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
223 # Claim this job, and make sure nobody else does
225 $Job->{'is_locked_by'} = $User->{'uuid'};
226 $Job->{'started_at'} = time;
227 $Job->{'running'} = 1;
228 $Job->{'success'} = undef;
229 $Job->{'tasks_summary'} = { 'failed' => 0,
233 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234 croak("Error while updating / locking job");
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
267 if (defined $Job->{thawedfromkey})
269 thaw ($Job->{thawedfromkey});
273 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274 'job_uuid' => $Job->{'uuid'},
279 push @jobstep, { 'level' => 0,
281 'arvados_task' => $first_task,
283 push @jobstep_todo, 0;
290 $build_script = <DATA>;
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
299 $ENV{"CRUNCH_SRC"} = $Job->{revision};
303 Log (undef, "Install revision ".$Job->{revision});
304 my $nodelist = join(",", @node);
306 # Clean out crunch_tmp/work and crunch_tmp/opt
308 my $cleanpid = fork();
311 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
317 last if $cleanpid == waitpid (-1, WNOHANG);
318 freeze_if_want_freeze ($cleanpid);
319 select (undef, undef, undef, 0.1);
321 Log (undef, "Clean-work-dir exited $?");
323 # Install requested code version
326 my @srunargs = ("srun",
327 "--nodelist=$nodelist",
328 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
330 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
335 my $treeish = $Job->{'script_version'};
336 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337 # Todo: let script_version specify alternate repo
338 $ENV{"CRUNCH_SRC_URL"} = $repo;
340 # Create/update our clone of the remote git repo
342 if (!-d $ENV{"CRUNCH_SRC"}) {
343 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344 or croak ("git clone $repo failed: exit ".($?>>8));
345 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
347 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
349 # If this looks like a subversion r#, look for it in git-svn commit messages
351 if ($treeish =~ m{^\d{1,4}$}) {
352 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
354 if ($gitlog =~ /^[a-f0-9]{40}$/) {
356 Log (undef, "Using commit $commit for revision $treeish");
360 # If that didn't work, try asking git to look it up as a tree-ish.
362 if (!defined $commit) {
364 my $cooked_treeish = $treeish;
365 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366 # Looks like a git branch name -- make sure git knows it's
367 # relative to the remote repo
368 $cooked_treeish = "origin/$treeish";
371 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
373 if ($found =~ /^[0-9a-f]{40}$/s) {
375 if ($commit ne $treeish) {
376 # Make sure we record the real commit id in the database,
377 # frozentokey, logs, etc. -- instead of an abbreviation or a
378 # branch name which can become ambiguous or point to a
379 # different commit in the future.
380 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381 Log (undef, "Using commit $commit for tree-ish $treeish");
382 if ($commit ne $treeish) {
383 $Job->{'script_version'} = $commit;
384 $Job->save() or croak("Error while updating job");
390 if (defined $commit) {
391 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392 @execargs = ("sh", "-c",
393 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
396 croak ("could not figure out commit id for $treeish");
399 my $installpid = fork();
400 if ($installpid == 0)
402 srun (\@srunargs, \@execargs, {}, $build_script);
407 last if $installpid == waitpid (-1, WNOHANG);
408 freeze_if_want_freeze ($installpid);
409 select (undef, undef, undef, 0.1);
411 Log (undef, "Install exited $?");
416 foreach (qw (script script_version script_parameters resource_limits))
418 Log (undef, $_ . " " . $Job->{$_});
420 foreach (split (/\n/, $Job->{knobs}))
422 Log (undef, "knob " . $_);
433 my $thisround_succeeded = 0;
434 my $thisround_failed = 0;
435 my $thisround_failed_multiple = 0;
437 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
438 or $a <=> $b } @jobstep_todo;
439 my $level = $jobstep[$jobstep_todo[0]]->{level};
440 Log (undef, "start level $level");
445 my @freeslot = (0..$#slot);
448 my $progress_is_dirty = 1;
449 my $progress_stats_updated = 0;
451 update_progress_stats();
456 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
458 $main::please_continue = 0;
460 my $id = $jobstep_todo[$todo_ptr];
461 my $Jobstep = $jobstep[$id];
462 if ($Jobstep->{level} != $level)
466 if ($Jobstep->{attempts} > 9)
468 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
473 pipe $reader{$id}, "writer" or croak ($!);
474 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
475 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
477 my $childslot = $freeslot[0];
478 my $childnode = $slot[$childslot]->{node};
479 my $childslotname = join (".",
480 $slot[$childslot]->{node}->{name},
481 $slot[$childslot]->{cpu});
482 my $childpid = fork();
485 $SIG{'INT'} = 'DEFAULT';
486 $SIG{'QUIT'} = 'DEFAULT';
487 $SIG{'TERM'} = 'DEFAULT';
489 foreach (values (%reader))
493 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
494 open(STDOUT,">&writer");
495 open(STDERR,">&writer");
500 delete $ENV{"GNUPGHOME"};
501 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
502 $ENV{"TASK_QSEQUENCE"} = $id;
503 $ENV{"TASK_SEQUENCE"} = $level;
504 $ENV{"JOB_SCRIPT"} = $Job->{script};
505 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
506 $param =~ tr/a-z/A-Z/;
507 $ENV{"JOB_PARAMETER_$param"} = $value;
509 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
510 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
511 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
512 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
518 "--nodelist=".$childnode->{name},
519 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
520 "--job-name=$job_id.$id.$$",
522 my @execargs = qw(sh);
523 my $build_script_to_send = "";
525 "mkdir -p $ENV{CRUNCH_TMP}/revision "
526 ."&& cd $ENV{CRUNCH_TMP} ";
529 $build_script_to_send = $build_script;
533 elsif (!$skip_install)
538 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
540 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
541 ." && ./installrevision "
545 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
547 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
548 my @execargs = ('bash', '-c', $command);
549 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
553 if (!defined $childpid)
560 $proc{$childpid} = { jobstep => $id,
563 jobstepname => "$job_id.$id.$childpid",
565 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
566 $slot[$childslot]->{pid} = $childpid;
568 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
569 Log ($id, "child $childpid started on $childslotname");
570 $Jobstep->{attempts} ++;
571 $Jobstep->{starttime} = time;
572 $Jobstep->{node} = $childnode->{name};
573 $Jobstep->{slotindex} = $childslot;
574 delete $Jobstep->{stderr};
575 delete $Jobstep->{finishtime};
577 splice @jobstep_todo, $todo_ptr, 1;
580 $progress_is_dirty = 1;
584 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
586 last THISROUND if $main::please_freeze;
587 if ($main::please_info)
589 $main::please_info = 0;
593 update_progress_stats();
601 update_progress_stats();
602 select (undef, undef, undef, 0.1);
604 elsif (time - $progress_stats_updated >= 30)
606 update_progress_stats();
608 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
609 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
611 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
612 .($thisround_failed+$thisround_succeeded)
613 .") -- giving up on this round";
614 Log (undef, $message);
618 # move slots from freeslot to holdslot (or back to freeslot) if necessary
619 for (my $i=$#freeslot; $i>=0; $i--) {
620 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
621 push @holdslot, (splice @freeslot, $i, 1);
624 for (my $i=$#holdslot; $i>=0; $i--) {
625 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
626 push @freeslot, (splice @holdslot, $i, 1);
630 # give up if no nodes are succeeding
631 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
632 my $message = "Every node has failed -- giving up on this round";
633 Log (undef, $message);
640 push @freeslot, splice @holdslot;
641 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
644 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
647 goto THISROUND if $main::please_continue;
648 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
653 update_progress_stats();
654 select (undef, undef, undef, 0.1);
655 killem (keys %proc) if $main::please_freeze;
659 update_progress_stats();
660 freeze_if_want_freeze();
663 if (!defined $success)
666 $thisround_succeeded == 0 &&
667 ($thisround_failed == 0 || $thisround_failed > 4))
669 my $message = "stop because $thisround_failed tasks failed and none succeeded";
670 Log (undef, $message);
679 goto ONELEVEL if !defined $success;
682 release_allocation();
684 $Job->{'output'} = &collate_output();
685 $Job->{'success'} = $Job->{'output'} && $success;
688 if ($Job->{'output'})
691 my $manifest_text = capturex("whget", $Job->{'output'});
692 $arv->{'collections'}->{'create'}->execute('collection' => {
693 'uuid' => $Job->{'output'},
694 'manifest_text' => $manifest_text,
698 Log (undef, "Failed to register output manifest: $@");
702 Log (undef, "finish");
709 sub update_progress_stats
711 $progress_stats_updated = time;
712 return if !$progress_is_dirty;
713 my ($todo, $done, $running) = (scalar @jobstep_todo,
714 scalar @jobstep_done,
715 scalar @slot - scalar @freeslot - scalar @holdslot);
716 $Job->{'tasks_summary'} ||= {};
717 $Job->{'tasks_summary'}->{'todo'} = $todo;
718 $Job->{'tasks_summary'}->{'done'} = $done;
719 $Job->{'tasks_summary'}->{'running'} = $running;
721 Log (undef, "status: $done done, $running running, $todo todo");
722 $progress_is_dirty = 0;
729 my $pid = waitpid (-1, WNOHANG);
730 return 0 if $pid <= 0;
732 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
734 . $slot[$proc{$pid}->{slot}]->{cpu});
735 my $jobstepid = $proc{$pid}->{jobstep};
736 my $elapsed = time - $proc{$pid}->{time};
737 my $Jobstep = $jobstep[$jobstepid];
740 my $exitinfo = "exit $exitcode";
741 $Jobstep->{'arvados_task'}->reload;
742 my $success = $Jobstep->{'arvados_task'}->{success};
744 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
746 if (!defined $success) {
747 # task did not indicate one way or the other --> fail
748 $Jobstep->{'arvados_task'}->{success} = 0;
749 $Jobstep->{'arvados_task'}->save;
755 --$Jobstep->{attempts} if $Jobstep->{node_fail};
757 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
759 # Check for signs of a failed or misconfigured node
760 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
761 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
762 # Don't count this against jobstep failure thresholds if this
763 # node is already suspected faulty and srun exited quickly
764 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
766 $Jobstep->{attempts} > 1) {
767 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
768 --$Jobstep->{attempts};
770 ban_node_by_slot($proc{$pid}->{slot});
773 push @jobstep_todo, $jobstepid;
774 Log ($jobstepid, "failure in $elapsed seconds");
775 $Job->{'tasks_summary'}->{'failed'}++;
779 ++$thisround_succeeded;
780 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
781 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
782 push @jobstep_done, $jobstepid;
783 Log ($jobstepid, "success in $elapsed seconds");
785 $Jobstep->{exitcode} = $exitcode;
786 $Jobstep->{finishtime} = time;
787 process_stderr ($jobstepid, $success);
788 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
790 close $reader{$jobstepid};
791 delete $reader{$jobstepid};
792 delete $slot[$proc{$pid}->{slot}]->{pid};
793 push @freeslot, $proc{$pid}->{slot};
797 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
799 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
801 'order' => 'qsequence'
803 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
805 'level' => $arvados_task->{'sequence'},
807 'arvados_task' => $arvados_task
809 push @jobstep, $jobstep;
810 push @jobstep_todo, $#jobstep;
813 $progress_is_dirty = 1;
820 # return if the kill list was checked <4 seconds ago
821 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
825 $squeue_kill_checked = time;
827 # use killem() on procs whose killtime is reached
830 if (exists $proc{$_}->{killtime}
831 && $proc{$_}->{killtime} <= time)
837 # return if the squeue was checked <60 seconds ago
838 if (defined $squeue_checked && $squeue_checked > time - 60)
842 $squeue_checked = time;
846 # here is an opportunity to check for mysterious problems with local procs
850 # get a list of steps still running
851 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
853 if ($squeue[-1] ne "ok")
859 # which of my jobsteps are running, according to squeue?
863 if (/^(\d+)\.(\d+) (\S+)/)
865 if ($1 eq $ENV{SLURM_JOBID})
872 # which of my active child procs (>60s old) were not mentioned by squeue?
875 if ($proc{$_}->{time} < time - 60
876 && !exists $ok{$proc{$_}->{jobstepname}}
877 && !exists $proc{$_}->{killtime})
879 # kill this proc if it hasn't exited in 30 seconds
880 $proc{$_}->{killtime} = time + 30;
886 sub release_allocation
890 Log (undef, "release job allocation");
891 system "scancel $ENV{SLURM_JOBID}";
899 foreach my $job (keys %reader)
902 while (0 < sysread ($reader{$job}, $buf, 8192))
904 print STDERR $buf if $ENV{CRUNCH_DEBUG};
905 $jobstep[$job]->{stderr} .= $buf;
906 preprocess_stderr ($job);
907 if (length ($jobstep[$job]->{stderr}) > 16384)
909 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
918 sub preprocess_stderr
922 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
924 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
925 Log ($job, "stderr $line");
926 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
928 $main::please_freeze = 1;
930 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
931 $jobstep[$job]->{node_fail} = 1;
932 ban_node_by_slot($jobstep[$job]->{slotindex});
942 preprocess_stderr ($job);
945 Log ($job, "stderr $_");
946 } split ("\n", $jobstep[$job]->{stderr});
952 my $whc = Warehouse->new;
953 Log (undef, "collate");
954 $whc->write_start (1);
958 next if (!exists $_->{'arvados_task'}->{output} ||
959 !$_->{'arvados_task'}->{'success'} ||
960 $_->{'exitcode'} != 0);
961 my $output = $_->{'arvados_task'}->{output};
962 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
964 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
965 $whc->write_data ($output);
967 elsif (@jobstep == 1)
969 $joboutput = $output;
972 elsif (defined (my $outblock = $whc->fetch_block ($output)))
974 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
975 $whc->write_data ($outblock);
979 my $errstr = $whc->errstr;
980 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
984 $joboutput = $whc->write_finish if !defined $joboutput;
987 Log (undef, "output $joboutput");
988 $Job->{'output'} = $joboutput;
993 Log (undef, "output undef");
1003 my $sig = 2; # SIGINT first
1004 if (exists $proc{$_}->{"sent_$sig"} &&
1005 time - $proc{$_}->{"sent_$sig"} > 4)
1007 $sig = 15; # SIGTERM if SIGINT doesn't work
1009 if (exists $proc{$_}->{"sent_$sig"} &&
1010 time - $proc{$_}->{"sent_$sig"} > 4)
1012 $sig = 9; # SIGKILL if SIGTERM doesn't work
1014 if (!exists $proc{$_}->{"sent_$sig"})
1016 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1018 select (undef, undef, undef, 0.1);
1021 kill $sig, $_; # srun wants two SIGINT to really interrupt
1023 $proc{$_}->{"sent_$sig"} = time;
1024 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1034 vec($bits,fileno($_),1) = 1;
1040 sub Log # ($jobstep_id, $logmessage)
1042 if ($_[1] =~ /\n/) {
1043 for my $line (split (/\n/, $_[1])) {
1048 my $fh = select STDERR; $|=1; select $fh;
1049 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1050 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1053 if ($metastream || -t STDERR) {
1054 my @gmtime = gmtime;
1055 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1056 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1058 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1060 return if !$metastream;
1061 $metastream->write_data ($datetime . " " . $message);
1065 sub reconnect_database
1067 return if !$job_has_uuid;
1068 return if ($dbh && $dbh->do ("select now()"));
1071 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1073 $dbh->{InactiveDestroy} = 1;
1076 warn ($DBI::errstr);
1079 croak ($DBI::errstr) if !$dbh;
1085 return 1 if !$job_has_uuid;
1086 my $ret = $dbh->do (@_);
1087 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1088 reconnect_database();
1089 return $dbh->do (@_);
1095 my ($package, $file, $line) = caller;
1096 my $message = "@_ at $file line $line\n";
1097 Log (undef, $message);
1098 freeze() if @jobstep_todo;
1099 collate_output() if @jobstep_todo;
1101 save_meta() if $metastream;
1108 return if !$job_has_uuid;
1110 $Job->{'running'} = 0;
1111 $Job->{'success'} = 0;
1112 $Job->{'finished_at'} = time;
1119 my $justcheckpoint = shift; # false if this will be the last meta saved
1120 my $m = $metastream;
1121 $m = $m->copy if $justcheckpoint;
1123 my $loglocator = $m->as_key;
1124 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1125 Log (undef, "meta key is $loglocator");
1126 $Job->{'log'} = $loglocator;
1131 sub freeze_if_want_freeze
1133 if ($main::please_freeze)
1135 release_allocation();
1138 # kill some srun procs before freeze+stop
1139 map { $proc{$_} = {} } @_;
1142 killem (keys %proc);
1143 select (undef, undef, undef, 0.1);
1145 while (($died = waitpid (-1, WNOHANG)) > 0)
1147 delete $proc{$died};
1162 Log (undef, "Freeze not implemented");
1166 Log (undef, "freeze");
1168 my $freezer = new Warehouse::Stream (whc => $whc);
1170 $freezer->name (".");
1171 $freezer->write_start ("state.txt");
1173 $freezer->write_data (join ("\n",
1177 $_ . "=" . freezequote($Job->{$_})
1178 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1180 foreach my $Jobstep (@jobstep)
1182 my $str = join ("\n",
1185 $_ . "=" . freezequote ($Jobstep->{$_})
1187 $_ !~ /^stderr|slotindex|node_fail/
1189 $freezer->write_data ($str."\n\n");
1191 if (@jobstep_tomerge)
1193 $freezer->write_data
1194 ("merge $jobstep_tomerge_level "
1195 . freezequote (join ("\n",
1196 map { freezequote ($_) } @jobstep_tomerge))
1200 $freezer->write_finish;
1201 my $frozentokey = $freezer->as_key;
1203 Log (undef, "frozento key is $frozentokey");
1204 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1205 $frozentokey, $job_id);
1206 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1207 Log (undef, "frozento+K key is $kfrozentokey");
1208 return $frozentokey;
1214 croak ("Thaw not implemented");
1218 Log (undef, "thaw from $key");
1223 @jobstep_tomerge = ();
1224 $jobstep_tomerge_level = 0;
1227 my $stream = new Warehouse::Stream ( whc => $whc,
1228 hash => [split (",", $key)] );
1230 while (my $dataref = $stream->read_until (undef, "\n\n"))
1232 if ($$dataref =~ /^job /)
1234 foreach (split ("\n", $$dataref))
1236 my ($k, $v) = split ("=", $_, 2);
1237 $frozenjob->{$k} = freezeunquote ($v);
1242 if ($$dataref =~ /^merge (\d+) (.*)/)
1244 $jobstep_tomerge_level = $1;
1246 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1251 foreach (split ("\n", $$dataref))
1253 my ($k, $v) = split ("=", $_, 2);
1254 $Jobstep->{$k} = freezeunquote ($v) if $k;
1256 $Jobstep->{attempts} = 0;
1257 push @jobstep, $Jobstep;
1259 if ($Jobstep->{exitcode} eq "0")
1261 push @jobstep_done, $#jobstep;
1265 push @jobstep_todo, $#jobstep;
1269 foreach (qw (script script_version script_parameters))
1271 $Job->{$_} = $frozenjob->{$_};
1289 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1296 my $srunargs = shift;
1297 my $execargs = shift;
1298 my $opts = shift || {};
1300 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1301 print STDERR (join (" ",
1302 map { / / ? "'$_'" : $_ }
1305 if $ENV{CRUNCH_DEBUG};
1307 if (defined $stdin) {
1308 my $child = open STDIN, "-|";
1309 defined $child or die "no fork: $!";
1311 print $stdin or die $!;
1312 close STDOUT or die $!;
1317 return system (@$args) if $opts->{fork};
1320 warn "ENV size is ".length(join(" ",%ENV));
1321 die "exec failed: $!: @$args";
1325 sub ban_node_by_slot {
1326 # Don't start any new jobsteps on this node for 60 seconds
1328 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1329 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1335 # checkout-and-build
1339 my $destdir = $ENV{"CRUNCH_SRC"};
1340 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1341 my $repo = $ENV{"CRUNCH_SRC_URL"};
1343 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1345 if (readlink ("$destdir.commit") eq $commit) {
1349 open STDOUT, ">", "$destdir.log";
1350 open STDERR, ">&STDOUT";
1352 if (-d "$destdir/.git") {
1353 chdir $destdir or die "chdir $destdir: $!";
1354 if (0 != system (qw(git remote set-url origin), $repo)) {
1355 # awful... for old versions of git that don't know "remote set-url"
1356 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1359 elsif ($repo && $commit)
1361 shell_or_die('git', 'clone', $repo, $destdir);
1362 chdir $destdir or die "chdir $destdir: $!";
1363 shell_or_die(qw(git config clean.requireForce false));
1366 die "$destdir does not exist, and no repo/commit specified -- giving up";
1370 unlink "$destdir.commit";
1371 shell_or_die (qw(git stash));
1372 shell_or_die (qw(git clean -d -x));
1373 shell_or_die (qw(git fetch origin));
1374 shell_or_die (qw(git checkout), $commit);
1378 chomp ($pwd = `pwd`);
1379 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1381 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1383 shell_or_die ("./tests/autotests.sh", $install_dir);
1384 } elsif (-e "./install.sh") {
1385 shell_or_die ("./install.sh", $install_dir);
1389 unlink "$destdir.commit.new";
1390 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1391 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1400 if ($ENV{"DEBUG"}) {
1401 print STDERR "@_\n";
1404 or die "@_ failed: $! exit 0x".sprintf("%x",$?);