2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
67 GetOptions('force-unlock' => \$force_unlock,
69 'resume-stash=s' => \$resume_stash,
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
82 $main::ENV{CRUNCH_DEBUG} = 1;
86 $main::ENV{CRUNCH_DEBUG} = 0;
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
94 $metastream->write_start('log.txt');
103 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104 $User = $arv->{'users'}->{'current'}->execute;
105 if (!$force_unlock) {
106 if ($Job->{'is_locked_by'}) {
107 croak("Job is locked: " . $Job->{'is_locked_by'});
109 if ($Job->{'success'} ne undef) {
110 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112 if ($Job->{'running'}) {
113 croak("Job 'running' flag is already set");
115 if ($Job->{'started_at'}) {
116 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
122 $Job = JSON::decode_json($jobspec);
126 map { croak ("No $_ specified") unless $Job->{$_} }
127 qw(script script_version script_parameters);
130 if (!defined $Job->{'uuid'}) {
131 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
134 $job_id = $Job->{'uuid'};
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
143 Log (undef, "check slurm allocation");
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
150 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151 push @sinfo, "$localcpus localhost";
153 if (exists $ENV{SLURM_NODELIST})
155 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
159 my ($ncpus, $slurm_nodelist) = split;
160 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
163 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
166 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
169 foreach (split (",", $ranges))
182 push @nodelist, map {
184 $n =~ s/\[[-,\d]+\]/$_/;
191 push @nodelist, $nodelist;
194 foreach my $nodename (@nodelist)
196 Log (undef, "node $nodename - $ncpus slots");
197 my $node = { name => $nodename,
201 foreach my $cpu (1..$ncpus)
203 push @slot, { node => $node,
207 push @node, @nodelist;
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
222 # Claim this job, and make sure nobody else does
224 $Job->{'is_locked_by'} = $User->{'uuid'};
225 $Job->{'started_at'} = time;
226 $Job->{'running'} = 1;
227 $Job->{'success'} = undef;
228 $Job->{'tasks_summary'} = { 'failed' => 0,
232 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233 croak("Error while updating / locking job");
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
266 if (defined $Job->{thawedfromkey})
268 thaw ($Job->{thawedfromkey});
272 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273 'job_uuid' => $Job->{'uuid'},
278 push @jobstep, { 'level' => 0,
280 'arvados_task' => $first_task,
282 push @jobstep_todo, 0;
289 $build_script = <DATA>;
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
298 $ENV{"CRUNCH_SRC"} = $Job->{revision};
302 Log (undef, "Install revision ".$Job->{revision});
303 my $nodelist = join(",", @node);
305 # Clean out crunch_tmp/work and crunch_tmp/opt
307 my $cleanpid = fork();
310 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
316 last if $cleanpid == waitpid (-1, WNOHANG);
317 freeze_if_want_freeze ($cleanpid);
318 select (undef, undef, undef, 0.1);
320 Log (undef, "Clean-work-dir exited $?");
322 # Install requested code version
325 my @srunargs = ("srun",
326 "--nodelist=$nodelist",
327 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
334 my $treeish = $Job->{'script_version'};
335 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336 # Todo: let script_version specify alternate repo
337 $ENV{"CRUNCH_SRC_URL"} = $repo;
339 # Create/update our clone of the remote git repo
341 if (!-d $ENV{"CRUNCH_SRC"}) {
342 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343 or croak ("git clone $repo failed: exit ".($?>>8));
344 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348 # If this looks like a subversion r#, look for it in git-svn commit messages
350 if ($treeish =~ m{^\d{1,4}$}) {
351 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353 if ($gitlog =~ /^[a-f0-9]{40}$/) {
355 Log (undef, "Using commit $commit for revision $treeish");
359 # If that didn't work, try asking git to look it up as a tree-ish.
361 if (!defined $commit) {
363 my $cooked_treeish = $treeish;
364 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365 # Looks like a git branch name -- make sure git knows it's
366 # relative to the remote repo
367 $cooked_treeish = "origin/$treeish";
370 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372 if ($found =~ /^[0-9a-f]{40}$/s) {
374 if ($commit ne $treeish) {
375 # Make sure we record the real commit id in the database,
376 # frozentokey, logs, etc. -- instead of an abbreviation or a
377 # branch name which can become ambiguous or point to a
378 # different commit in the future.
379 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380 Log (undef, "Using commit $commit for tree-ish $treeish");
381 if ($commit ne $treeish) {
382 $Job->{'script_version'} = $commit;
383 $Job->save() or croak("Error while updating job");
389 if (defined $commit) {
390 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391 @execargs = ("sh", "-c",
392 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
395 croak ("could not figure out commit id for $treeish");
398 my $installpid = fork();
399 if ($installpid == 0)
401 srun (\@srunargs, \@execargs, {}, $build_script);
406 last if $installpid == waitpid (-1, WNOHANG);
407 freeze_if_want_freeze ($installpid);
408 select (undef, undef, undef, 0.1);
410 Log (undef, "Install exited $?");
415 foreach (qw (script script_version script_parameters resource_limits))
417 Log (undef, $_ . " " . $Job->{$_});
419 foreach (split (/\n/, $Job->{knobs}))
421 Log (undef, "knob " . $_);
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437 or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
444 my @freeslot = (0..$#slot);
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
450 update_progress_stats();
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 $main::please_continue = 0;
459 my $id = $jobstep_todo[$todo_ptr];
460 my $Jobstep = $jobstep[$id];
461 if ($Jobstep->{level} != $level)
465 if ($Jobstep->{attempts} > 9)
467 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
472 pipe $reader{$id}, "writer" or croak ($!);
473 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476 my $childslot = $freeslot[0];
477 my $childnode = $slot[$childslot]->{node};
478 my $childslotname = join (".",
479 $slot[$childslot]->{node}->{name},
480 $slot[$childslot]->{cpu});
481 my $childpid = fork();
484 $SIG{'INT'} = 'DEFAULT';
485 $SIG{'QUIT'} = 'DEFAULT';
486 $SIG{'TERM'} = 'DEFAULT';
488 foreach (values (%reader))
492 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493 open(STDOUT,">&writer");
494 open(STDERR,">&writer");
499 delete $ENV{"GNUPGHOME"};
500 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501 $ENV{"TASK_QSEQUENCE"} = $id;
502 $ENV{"TASK_SEQUENCE"} = $level;
503 $ENV{"JOB_SCRIPT"} = $Job->{script};
504 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505 $param =~ tr/a-z/A-Z/;
506 $ENV{"JOB_PARAMETER_$param"} = $value;
508 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
517 "--nodelist=".$childnode->{name},
518 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519 "--job-name=$job_id.$id.$$",
521 my @execargs = qw(sh);
522 my $build_script_to_send = "";
524 "mkdir -p $ENV{CRUNCH_TMP}/revision "
525 ."&& cd $ENV{CRUNCH_TMP} ";
528 $build_script_to_send = $build_script;
532 elsif (!$skip_install)
537 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540 ." && ./installrevision "
544 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547 my @execargs = ('bash', '-c', $command);
548 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
552 if (!defined $childpid)
559 $proc{$childpid} = { jobstep => $id,
562 jobstepname => "$job_id.$id.$childpid",
564 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565 $slot[$childslot]->{pid} = $childpid;
567 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568 Log ($id, "child $childpid started on $childslotname");
569 $Jobstep->{attempts} ++;
570 $Jobstep->{starttime} = time;
571 $Jobstep->{node} = $childnode->{name};
572 $Jobstep->{slotindex} = $childslot;
573 delete $Jobstep->{stderr};
574 delete $Jobstep->{finishtime};
576 splice @jobstep_todo, $todo_ptr, 1;
579 $progress_is_dirty = 1;
583 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
585 last THISROUND if $main::please_freeze;
586 if ($main::please_info)
588 $main::please_info = 0;
592 update_progress_stats();
600 update_progress_stats();
601 select (undef, undef, undef, 0.1);
603 elsif (time - $progress_stats_updated >= 30)
605 update_progress_stats();
607 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
610 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611 .($thisround_failed+$thisround_succeeded)
612 .") -- giving up on this round";
613 Log (undef, $message);
617 # move slots from freeslot to holdslot (or back to freeslot) if necessary
618 for (my $i=$#freeslot; $i>=0; $i--) {
619 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620 push @holdslot, (splice @freeslot, $i, 1);
623 for (my $i=$#holdslot; $i>=0; $i--) {
624 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625 push @freeslot, (splice @holdslot, $i, 1);
629 # give up if no nodes are succeeding
630 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631 my $message = "Every node has failed -- giving up on this round";
632 Log (undef, $message);
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
646 goto THISROUND if $main::please_continue;
647 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
652 update_progress_stats();
653 select (undef, undef, undef, 0.1);
654 killem (keys %proc) if $main::please_freeze;
658 update_progress_stats();
659 freeze_if_want_freeze();
662 if (!defined $success)
665 $thisround_succeeded == 0 &&
666 ($thisround_failed == 0 || $thisround_failed > 4))
668 my $message = "stop because $thisround_failed tasks failed and none succeeded";
669 Log (undef, $message);
678 goto ONELEVEL if !defined $success;
681 release_allocation();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = $Job->{'output'} && $success;
687 if ($Job->{'output'})
689 $arv->{'collections'}->{'create'}->execute('collection' => {
690 'uuid' => $Job->{'output'},
691 'manifest_text' => system("whget", $Job->{'output'}),
695 Log (undef, "finish");
702 sub update_progress_stats
704 $progress_stats_updated = time;
705 return if !$progress_is_dirty;
706 my ($todo, $done, $running) = (scalar @jobstep_todo,
707 scalar @jobstep_done,
708 scalar @slot - scalar @freeslot - scalar @holdslot);
709 $Job->{'tasks_summary'} ||= {};
710 $Job->{'tasks_summary'}->{'todo'} = $todo;
711 $Job->{'tasks_summary'}->{'done'} = $done;
712 $Job->{'tasks_summary'}->{'running'} = $running;
714 Log (undef, "status: $done done, $running running, $todo todo");
715 $progress_is_dirty = 0;
722 my $pid = waitpid (-1, WNOHANG);
723 return 0 if $pid <= 0;
725 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
727 . $slot[$proc{$pid}->{slot}]->{cpu});
728 my $jobstepid = $proc{$pid}->{jobstep};
729 my $elapsed = time - $proc{$pid}->{time};
730 my $Jobstep = $jobstep[$jobstepid];
733 my $exitinfo = "exit $exitcode";
734 $Jobstep->{'arvados_task'}->reload;
735 my $success = $Jobstep->{'arvados_task'}->{success};
737 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
739 if (!defined $success) {
740 # task did not indicate one way or the other --> fail
741 $Jobstep->{'arvados_task'}->{success} = 0;
742 $Jobstep->{'arvados_task'}->save;
748 --$Jobstep->{attempts} if $Jobstep->{node_fail};
750 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
752 # Check for signs of a failed or misconfigured node
753 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
754 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
755 # Don't count this against jobstep failure thresholds if this
756 # node is already suspected faulty and srun exited quickly
757 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
759 $Jobstep->{attempts} > 1) {
760 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
761 --$Jobstep->{attempts};
763 ban_node_by_slot($proc{$pid}->{slot});
766 push @jobstep_todo, $jobstepid;
767 Log ($jobstepid, "failure in $elapsed seconds");
768 $Job->{'tasks_summary'}->{'failed'}++;
772 ++$thisround_succeeded;
773 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
774 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
775 push @jobstep_done, $jobstepid;
776 Log ($jobstepid, "success in $elapsed seconds");
778 $Jobstep->{exitcode} = $exitcode;
779 $Jobstep->{finishtime} = time;
780 process_stderr ($jobstepid, $success);
781 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
783 close $reader{$jobstepid};
784 delete $reader{$jobstepid};
785 delete $slot[$proc{$pid}->{slot}]->{pid};
786 push @freeslot, $proc{$pid}->{slot};
790 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
792 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
794 'order' => 'qsequence'
796 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
798 'level' => $arvados_task->{'sequence'},
800 'arvados_task' => $arvados_task
802 push @jobstep, $jobstep;
803 push @jobstep_todo, $#jobstep;
806 $progress_is_dirty = 1;
813 # return if the kill list was checked <4 seconds ago
814 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
818 $squeue_kill_checked = time;
820 # use killem() on procs whose killtime is reached
823 if (exists $proc{$_}->{killtime}
824 && $proc{$_}->{killtime} <= time)
830 # return if the squeue was checked <60 seconds ago
831 if (defined $squeue_checked && $squeue_checked > time - 60)
835 $squeue_checked = time;
839 # here is an opportunity to check for mysterious problems with local procs
843 # get a list of steps still running
844 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
846 if ($squeue[-1] ne "ok")
852 # which of my jobsteps are running, according to squeue?
856 if (/^(\d+)\.(\d+) (\S+)/)
858 if ($1 eq $ENV{SLURM_JOBID})
865 # which of my active child procs (>60s old) were not mentioned by squeue?
868 if ($proc{$_}->{time} < time - 60
869 && !exists $ok{$proc{$_}->{jobstepname}}
870 && !exists $proc{$_}->{killtime})
872 # kill this proc if it hasn't exited in 30 seconds
873 $proc{$_}->{killtime} = time + 30;
879 sub release_allocation
883 Log (undef, "release job allocation");
884 system "scancel $ENV{SLURM_JOBID}";
892 foreach my $job (keys %reader)
895 while (0 < sysread ($reader{$job}, $buf, 8192))
897 print STDERR $buf if $ENV{CRUNCH_DEBUG};
898 $jobstep[$job]->{stderr} .= $buf;
899 preprocess_stderr ($job);
900 if (length ($jobstep[$job]->{stderr}) > 16384)
902 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
911 sub preprocess_stderr
915 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
917 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
918 Log ($job, "stderr $line");
919 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
921 $main::please_freeze = 1;
923 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
924 $jobstep[$job]->{node_fail} = 1;
925 ban_node_by_slot($jobstep[$job]->{slotindex});
935 preprocess_stderr ($job);
938 Log ($job, "stderr $_");
939 } split ("\n", $jobstep[$job]->{stderr});
945 my $whc = Warehouse->new;
946 Log (undef, "collate");
947 $whc->write_start (1);
951 next if (!exists $_->{'arvados_task'}->{output} ||
952 !$_->{'arvados_task'}->{'success'} ||
953 $_->{'exitcode'} != 0);
954 my $output = $_->{'arvados_task'}->{output};
955 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
957 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
958 $whc->write_data ($output);
960 elsif (@jobstep == 1)
962 $joboutput = $output;
965 elsif (defined (my $outblock = $whc->fetch_block ($output)))
967 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
968 $whc->write_data ($outblock);
972 my $errstr = $whc->errstr;
973 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
977 $joboutput = $whc->write_finish if !defined $joboutput;
980 Log (undef, "output $joboutput");
981 $Job->{'output'} = $joboutput;
986 Log (undef, "output undef");
996 my $sig = 2; # SIGINT first
997 if (exists $proc{$_}->{"sent_$sig"} &&
998 time - $proc{$_}->{"sent_$sig"} > 4)
1000 $sig = 15; # SIGTERM if SIGINT doesn't work
1002 if (exists $proc{$_}->{"sent_$sig"} &&
1003 time - $proc{$_}->{"sent_$sig"} > 4)
1005 $sig = 9; # SIGKILL if SIGTERM doesn't work
1007 if (!exists $proc{$_}->{"sent_$sig"})
1009 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1011 select (undef, undef, undef, 0.1);
1014 kill $sig, $_; # srun wants two SIGINT to really interrupt
1016 $proc{$_}->{"sent_$sig"} = time;
1017 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1027 vec($bits,fileno($_),1) = 1;
1033 sub Log # ($jobstep_id, $logmessage)
1035 if ($_[1] =~ /\n/) {
1036 for my $line (split (/\n/, $_[1])) {
1041 my $fh = select STDERR; $|=1; select $fh;
1042 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1043 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1046 if ($metastream || -t STDERR) {
1047 my @gmtime = gmtime;
1048 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1049 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1051 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1053 return if !$metastream;
1054 $metastream->write_data ($datetime . " " . $message);
1058 sub reconnect_database
1060 return if !$job_has_uuid;
1061 return if ($dbh && $dbh->do ("select now()"));
1064 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1066 $dbh->{InactiveDestroy} = 1;
1069 warn ($DBI::errstr);
1072 croak ($DBI::errstr) if !$dbh;
1078 return 1 if !$job_has_uuid;
1079 my $ret = $dbh->do (@_);
1080 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1081 reconnect_database();
1082 return $dbh->do (@_);
1088 my ($package, $file, $line) = caller;
1089 my $message = "@_ at $file line $line\n";
1090 Log (undef, $message);
1091 freeze() if @jobstep_todo;
1092 collate_output() if @jobstep_todo;
1094 save_meta() if $metastream;
1101 return if !$job_has_uuid;
1103 $Job->{'running'} = 0;
1104 $Job->{'success'} = 0;
1105 $Job->{'finished_at'} = time;
1112 my $justcheckpoint = shift; # false if this will be the last meta saved
1113 my $m = $metastream;
1114 $m = $m->copy if $justcheckpoint;
1116 my $loglocator = $m->as_key;
1117 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1118 Log (undef, "meta key is $loglocator");
1119 $Job->{'log'} = $loglocator;
1124 sub freeze_if_want_freeze
1126 if ($main::please_freeze)
1128 release_allocation();
1131 # kill some srun procs before freeze+stop
1132 map { $proc{$_} = {} } @_;
1135 killem (keys %proc);
1136 select (undef, undef, undef, 0.1);
1138 while (($died = waitpid (-1, WNOHANG)) > 0)
1140 delete $proc{$died};
1155 Log (undef, "Freeze not implemented");
1159 Log (undef, "freeze");
1161 my $freezer = new Warehouse::Stream (whc => $whc);
1163 $freezer->name (".");
1164 $freezer->write_start ("state.txt");
1166 $freezer->write_data (join ("\n",
1170 $_ . "=" . freezequote($Job->{$_})
1171 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1173 foreach my $Jobstep (@jobstep)
1175 my $str = join ("\n",
1178 $_ . "=" . freezequote ($Jobstep->{$_})
1180 $_ !~ /^stderr|slotindex|node_fail/
1182 $freezer->write_data ($str."\n\n");
1184 if (@jobstep_tomerge)
1186 $freezer->write_data
1187 ("merge $jobstep_tomerge_level "
1188 . freezequote (join ("\n",
1189 map { freezequote ($_) } @jobstep_tomerge))
1193 $freezer->write_finish;
1194 my $frozentokey = $freezer->as_key;
1196 Log (undef, "frozento key is $frozentokey");
1197 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1198 $frozentokey, $job_id);
1199 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1200 Log (undef, "frozento+K key is $kfrozentokey");
1201 return $frozentokey;
1207 croak ("Thaw not implemented");
1211 Log (undef, "thaw from $key");
1216 @jobstep_tomerge = ();
1217 $jobstep_tomerge_level = 0;
1220 my $stream = new Warehouse::Stream ( whc => $whc,
1221 hash => [split (",", $key)] );
1223 while (my $dataref = $stream->read_until (undef, "\n\n"))
1225 if ($$dataref =~ /^job /)
1227 foreach (split ("\n", $$dataref))
1229 my ($k, $v) = split ("=", $_, 2);
1230 $frozenjob->{$k} = freezeunquote ($v);
1235 if ($$dataref =~ /^merge (\d+) (.*)/)
1237 $jobstep_tomerge_level = $1;
1239 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1244 foreach (split ("\n", $$dataref))
1246 my ($k, $v) = split ("=", $_, 2);
1247 $Jobstep->{$k} = freezeunquote ($v) if $k;
1249 $Jobstep->{attempts} = 0;
1250 push @jobstep, $Jobstep;
1252 if ($Jobstep->{exitcode} eq "0")
1254 push @jobstep_done, $#jobstep;
1258 push @jobstep_todo, $#jobstep;
1262 foreach (qw (script script_version script_parameters))
1264 $Job->{$_} = $frozenjob->{$_};
1282 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1289 my $srunargs = shift;
1290 my $execargs = shift;
1291 my $opts = shift || {};
1293 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1294 print STDERR (join (" ",
1295 map { / / ? "'$_'" : $_ }
1298 if $ENV{CRUNCH_DEBUG};
1300 if (defined $stdin) {
1301 my $child = open STDIN, "-|";
1302 defined $child or die "no fork: $!";
1304 print $stdin or die $!;
1305 close STDOUT or die $!;
1310 return system (@$args) if $opts->{fork};
1313 warn "ENV size is ".length(join(" ",%ENV));
1314 die "exec failed: $!: @$args";
1318 sub ban_node_by_slot {
1319 # Don't start any new jobsteps on this node for 60 seconds
1321 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1322 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1328 # checkout-and-build
1332 my $destdir = $ENV{"CRUNCH_SRC"};
1333 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1334 my $repo = $ENV{"CRUNCH_SRC_URL"};
1336 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1338 if (readlink ("$destdir.commit") eq $commit) {
1342 open STDOUT, ">", "$destdir.log";
1343 open STDERR, ">&STDOUT";
1345 if (-d "$destdir/.git") {
1346 chdir $destdir or die "chdir $destdir: $!";
1347 if (0 != system (qw(git remote set-url origin), $repo)) {
1348 # awful... for old versions of git that don't know "remote set-url"
1349 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1352 elsif ($repo && $commit)
1354 shell_or_die('git', 'clone', $repo, $destdir);
1355 chdir $destdir or die "chdir $destdir: $!";
1356 shell_or_die(qw(git config clean.requireForce false));
1359 die "$destdir does not exist, and no repo/commit specified -- giving up";
1363 unlink "$destdir.commit";
1364 shell_or_die (qw(git stash));
1365 shell_or_die (qw(git clean -d -x));
1366 shell_or_die (qw(git fetch origin));
1367 shell_or_die (qw(git checkout), $commit);
1371 chomp ($pwd = `pwd`);
1372 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1374 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1376 shell_or_die ("./tests/autotests.sh", $install_dir);
1377 } elsif (-e "./install.sh") {
1378 shell_or_die ("./install.sh", $install_dir);
1382 unlink "$destdir.commit.new";
1383 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1384 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1393 if ($ENV{"DEBUG"}) {
1394 print STDERR "@_\n";
1397 or die "@_ failed: $! exit 0x".sprintf("%x",$?);