2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --uuid x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
20 =head1 RUNNING JOBS LOCALLY
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
26 If the job succeeds, the job's output locator is printed on stdout.
28 While the job is running, the following signals are accepted:
32 =item control-C, SIGINT, SIGQUIT
34 Save a checkpoint, terminate any job tasks that are running, and stop.
38 Save a checkpoint and continue.
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
67 GetOptions('force-unlock' => \$force_unlock,
69 'resume-stash=s' => \$resume_stash,
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
82 $main::ENV{CRUNCH_DEBUG} = 1;
86 $main::ENV{CRUNCH_DEBUG} = 0;
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
94 $metastream->write_start('log.txt');
103 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104 $User = $arv->{'users'}->{'current'}->execute;
105 if (!$force_unlock) {
106 if ($Job->{'is_locked_by'}) {
107 croak("Job is locked: " . $Job->{'is_locked_by'});
109 if ($Job->{'success'} ne undef) {
110 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112 if ($Job->{'running'}) {
113 croak("Job 'running' flag is already set");
115 if ($Job->{'started_at'}) {
116 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
122 $Job = JSON::decode_json($jobspec);
126 map { croak ("No $_ specified") unless $Job->{$_} }
127 qw(script script_version script_parameters);
130 if (!defined $Job->{'uuid'}) {
131 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
134 $job_id = $Job->{'uuid'};
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
143 Log (undef, "check slurm allocation");
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
150 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151 push @sinfo, "$localcpus localhost";
153 if (exists $ENV{SLURM_NODELIST})
155 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
159 my ($ncpus, $slurm_nodelist) = split;
160 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
163 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
166 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
169 foreach (split (",", $ranges))
182 push @nodelist, map {
184 $n =~ s/\[[-,\d]+\]/$_/;
191 push @nodelist, $nodelist;
194 foreach my $nodename (@nodelist)
196 Log (undef, "node $nodename - $ncpus slots");
197 my $node = { name => $nodename,
201 foreach my $cpu (1..$ncpus)
203 push @slot, { node => $node,
207 push @node, @nodelist;
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
222 # Claim this job, and make sure nobody else does
224 $Job->{'is_locked_by'} = $User->{'uuid'};
225 $Job->{'started_at'} = time;
226 $Job->{'running'} = 1;
227 $Job->{'success'} = undef;
228 $Job->{'tasks_summary'} = { 'failed' => 0,
232 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233 croak("Error while updating / locking job");
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
266 if (defined $Job->{thawedfromkey})
268 thaw ($Job->{thawedfromkey});
272 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273 'job_uuid' => $Job->{'uuid'},
278 push @jobstep, { level => 0,
280 arvados_task => $first_task,
282 push @jobstep_todo, 0;
289 $build_script = <DATA>;
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
298 $ENV{"CRUNCH_SRC"} = $Job->{revision};
302 Log (undef, "Install revision ".$Job->{revision});
303 my $nodelist = join(",", @node);
305 # Clean out crunch_tmp/work and crunch_tmp/opt
307 my $cleanpid = fork();
310 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
316 last if $cleanpid == waitpid (-1, WNOHANG);
317 freeze_if_want_freeze ($cleanpid);
318 select (undef, undef, undef, 0.1);
320 Log (undef, "Clean-work-dir exited $?");
322 # Install requested code version
325 my @srunargs = ("srun",
326 "--nodelist=$nodelist",
327 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
334 my $treeish = $Job->{'script_version'};
335 my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336 # Todo: let script_version specify alternate repo
337 $ENV{"CRUNCH_SRC_URL"} = $repo;
339 # Create/update our clone of the remote git repo
341 if (!-d $ENV{"CRUNCH_SRC"}) {
342 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343 or croak ("git clone $repo failed: exit ".($?>>8));
344 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348 # If this looks like a subversion r#, look for it in git-svn commit messages
350 if ($treeish =~ m{^\d{1,4}$}) {
351 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353 if ($gitlog =~ /^[a-f0-9]{40}$/) {
355 Log (undef, "Using commit $commit for revision $treeish");
359 # If that didn't work, try asking git to look it up as a tree-ish.
361 if (!defined $commit) {
363 my $cooked_treeish = $treeish;
364 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365 # Looks like a git branch name -- make sure git knows it's
366 # relative to the remote repo
367 $cooked_treeish = "origin/$treeish";
370 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372 if ($found =~ /^[0-9a-f]{40}$/s) {
374 if ($commit ne $treeish) {
375 # Make sure we record the real commit id in the database,
376 # frozentokey, logs, etc. -- instead of an abbreviation or a
377 # branch name which can become ambiguous or point to a
378 # different commit in the future.
379 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380 Log (undef, "Using commit $commit for tree-ish $treeish");
381 if ($commit ne $treeish) {
382 $Job->{'script_version'} = $commit;
383 $Job->save() or croak("Error while updating job");
389 if (defined $commit) {
390 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391 @execargs = ("sh", "-c",
392 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
395 croak ("could not figure out commit id for $treeish");
398 my $installpid = fork();
399 if ($installpid == 0)
401 srun (\@srunargs, \@execargs, {}, $build_script);
406 last if $installpid == waitpid (-1, WNOHANG);
407 freeze_if_want_freeze ($installpid);
408 select (undef, undef, undef, 0.1);
410 Log (undef, "Install exited $?");
415 foreach (qw (script script_version script_parameters resource_limits))
417 Log (undef, $_ . " " . $Job->{$_});
419 foreach (split (/\n/, $Job->{knobs}))
421 Log (undef, "knob " . $_);
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437 or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
444 my @freeslot = (0..$#slot);
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
450 update_progress_stats();
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 $main::please_continue = 0;
459 my $id = $jobstep_todo[$todo_ptr];
460 my $Jobstep = $jobstep[$id];
461 if ($Jobstep->{level} != $level)
465 if ($Jobstep->{attempts} > 9)
467 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
472 pipe $reader{$id}, "writer" or croak ($!);
473 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476 my $childslot = $freeslot[0];
477 my $childnode = $slot[$childslot]->{node};
478 my $childslotname = join (".",
479 $slot[$childslot]->{node}->{name},
480 $slot[$childslot]->{cpu});
481 my $childpid = fork();
484 $SIG{'INT'} = 'DEFAULT';
485 $SIG{'QUIT'} = 'DEFAULT';
486 $SIG{'TERM'} = 'DEFAULT';
488 foreach (values (%reader))
492 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493 open(STDOUT,">&writer");
494 open(STDERR,">&writer");
499 delete $ENV{"GNUPGHOME"};
500 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501 $ENV{"TASK_QSEQUENCE"} = $id;
502 $ENV{"TASK_SEQUENCE"} = $level;
503 $ENV{"JOB_SCRIPT"} = $Job->{script};
504 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505 $param =~ tr/a-z/A-Z/;
506 $ENV{"JOB_PARAMETER_$param"} = $value;
508 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
517 "--nodelist=".$childnode->{name},
518 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519 "--job-name=$job_id.$id.$$",
521 my @execargs = qw(sh);
522 my $build_script_to_send = "";
524 "mkdir -p $ENV{CRUNCH_TMP}/revision "
525 ."&& cd $ENV{CRUNCH_TMP} ";
528 $build_script_to_send = $build_script;
532 elsif (!$skip_install)
537 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540 ." && ./installrevision "
544 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547 my @execargs = ('bash', '-c', $command);
548 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
552 if (!defined $childpid)
559 $proc{$childpid} = { jobstep => $id,
562 jobstepname => "$job_id.$id.$childpid",
564 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565 $slot[$childslot]->{pid} = $childpid;
567 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568 Log ($id, "child $childpid started on $childslotname");
569 $Jobstep->{attempts} ++;
570 $Jobstep->{starttime} = time;
571 $Jobstep->{node} = $childnode->{name};
572 $Jobstep->{slotindex} = $childslot;
573 delete $Jobstep->{stderr};
574 delete $Jobstep->{finishtime};
576 splice @jobstep_todo, $todo_ptr, 1;
579 $progress_is_dirty = 1;
583 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
585 last THISROUND if $main::please_freeze;
586 if ($main::please_info)
588 $main::please_info = 0;
592 update_progress_stats();
600 update_progress_stats();
601 select (undef, undef, undef, 0.1);
603 elsif (time - $progress_stats_updated >= 30)
605 update_progress_stats();
607 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
610 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611 .($thisround_failed+$thisround_succeeded)
612 .") -- giving up on this round";
613 Log (undef, $message);
617 # move slots from freeslot to holdslot (or back to freeslot) if necessary
618 for (my $i=$#freeslot; $i>=0; $i--) {
619 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620 push @holdslot, (splice @freeslot, $i, 1);
623 for (my $i=$#holdslot; $i>=0; $i--) {
624 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625 push @freeslot, (splice @holdslot, $i, 1);
629 # give up if no nodes are succeeding
630 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631 my $message = "Every node has failed -- giving up on this round";
632 Log (undef, $message);
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
646 goto THISROUND if $main::please_continue;
647 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
652 update_progress_stats();
653 select (undef, undef, undef, 0.1);
654 killem (keys %proc) if $main::please_freeze;
658 update_progress_stats();
659 freeze_if_want_freeze();
662 if (!defined $success)
665 $thisround_succeeded == 0 &&
666 ($thisround_failed == 0 || $thisround_failed > 4))
668 my $message = "stop because $thisround_failed tasks failed and none succeeded";
669 Log (undef, $message);
678 goto ONELEVEL if !defined $success;
681 release_allocation();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = 0 if !$Job->{'output'};
687 if ($Job->{'output'})
689 $arv->{'collections'}->{'create'}->execute('collection' => {
690 'uuid' => $Job->{'output'},
691 'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
696 Log (undef, "finish");
698 $Job->{'success'} = $Job->{'output'} && $success;
706 sub update_progress_stats
708 $progress_stats_updated = time;
709 return if !$progress_is_dirty;
710 my ($todo, $done, $running) = (scalar @jobstep_todo,
711 scalar @jobstep_done,
712 scalar @slot - scalar @freeslot - scalar @holdslot);
713 $Job->{'tasks_summary'} ||= {};
714 $Job->{'tasks_summary'}->{'todo'} = $todo;
715 $Job->{'tasks_summary'}->{'done'} = $done;
716 $Job->{'tasks_summary'}->{'running'} = $running;
718 Log (undef, "status: $done done, $running running, $todo todo");
719 $progress_is_dirty = 0;
726 my $pid = waitpid (-1, WNOHANG);
727 return 0 if $pid <= 0;
729 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
731 . $slot[$proc{$pid}->{slot}]->{cpu});
732 my $jobstepid = $proc{$pid}->{jobstep};
733 my $elapsed = time - $proc{$pid}->{time};
734 my $Jobstep = $jobstep[$jobstepid];
737 my $exitinfo = "exit $exitcode";
738 $Jobstep->{arvados_task}->reload;
739 my $success = $Jobstep->{arvados_task}->{success};
741 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
743 if (!defined $success) {
744 # task did not indicate one way or the other --> fail
745 $Jobstep->{arvados_task}->{success} = 0;
746 $Jobstep->{arvados_task}->save;
752 --$Jobstep->{attempts} if $Jobstep->{node_fail};
754 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
756 # Check for signs of a failed or misconfigured node
757 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
758 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
759 # Don't count this against jobstep failure thresholds if this
760 # node is already suspected faulty and srun exited quickly
761 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
763 $Jobstep->{attempts} > 1) {
764 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
765 --$Jobstep->{attempts};
767 ban_node_by_slot($proc{$pid}->{slot});
770 push @jobstep_todo, $jobstepid;
771 Log ($jobstepid, "failure in $elapsed seconds");
772 $Job->{'tasks_summary'}->{'failed'}++;
776 ++$thisround_succeeded;
777 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
778 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
779 push @jobstep_done, $jobstepid;
780 Log ($jobstepid, "success in $elapsed seconds");
782 $Jobstep->{exitcode} = $exitcode;
783 $Jobstep->{finishtime} = time;
784 process_stderr ($jobstepid, $success);
785 Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
787 close $reader{$jobstepid};
788 delete $reader{$jobstepid};
789 delete $slot[$proc{$pid}->{slot}]->{pid};
790 push @freeslot, $proc{$pid}->{slot};
794 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
796 'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
798 'order' => 'qsequence'
800 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
802 'level' => $arvados_task->{'sequence'},
804 'arvados_task' => $arvados_task
806 push @jobstep, $jobstep;
807 push @jobstep_todo, $#jobstep;
810 $progress_is_dirty = 1;
817 # return if the kill list was checked <4 seconds ago
818 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
822 $squeue_kill_checked = time;
824 # use killem() on procs whose killtime is reached
827 if (exists $proc{$_}->{killtime}
828 && $proc{$_}->{killtime} <= time)
834 # return if the squeue was checked <60 seconds ago
835 if (defined $squeue_checked && $squeue_checked > time - 60)
839 $squeue_checked = time;
843 # here is an opportunity to check for mysterious problems with local procs
847 # get a list of steps still running
848 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
850 if ($squeue[-1] ne "ok")
856 # which of my jobsteps are running, according to squeue?
860 if (/^(\d+)\.(\d+) (\S+)/)
862 if ($1 eq $ENV{SLURM_JOBID})
869 # which of my active child procs (>60s old) were not mentioned by squeue?
872 if ($proc{$_}->{time} < time - 60
873 && !exists $ok{$proc{$_}->{jobstepname}}
874 && !exists $proc{$_}->{killtime})
876 # kill this proc if it hasn't exited in 30 seconds
877 $proc{$_}->{killtime} = time + 30;
883 sub release_allocation
887 Log (undef, "release job allocation");
888 system "scancel $ENV{SLURM_JOBID}";
896 foreach my $job (keys %reader)
899 while (0 < sysread ($reader{$job}, $buf, 8192))
901 print STDERR $buf if $ENV{CRUNCH_DEBUG};
902 $jobstep[$job]->{stderr} .= $buf;
903 preprocess_stderr ($job);
904 if (length ($jobstep[$job]->{stderr}) > 16384)
906 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
915 sub preprocess_stderr
919 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
921 if ($line =~ /\+\+\+mr/) {
924 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
925 Log ($job, "stderr $line");
926 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
928 $main::please_freeze = 1;
930 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
931 $jobstep[$job]->{node_fail} = 1;
932 ban_node_by_slot($jobstep[$job]->{slotindex});
942 preprocess_stderr ($job);
945 Log ($job, "stderr $_");
946 } split ("\n", $jobstep[$job]->{stderr});
952 my $whc = Warehouse->new;
953 Log (undef, "collate");
954 $whc->write_start (1);
958 next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
959 my $output = $_->{arvados_task}->{output};
960 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
962 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
963 $whc->write_data ($output);
965 elsif (@jobstep == 1)
967 $joboutput = $output;
970 elsif (defined (my $outblock = $whc->fetch_block ($output)))
972 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
973 $whc->write_data ($outblock);
977 my $errstr = $whc->errstr;
978 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
982 $joboutput = $whc->write_finish if !defined $joboutput;
985 Log (undef, "output $joboutput");
986 $Job->{'output'} = $joboutput;
991 Log (undef, "output undef");
1001 my $sig = 2; # SIGINT first
1002 if (exists $proc{$_}->{"sent_$sig"} &&
1003 time - $proc{$_}->{"sent_$sig"} > 4)
1005 $sig = 15; # SIGTERM if SIGINT doesn't work
1007 if (exists $proc{$_}->{"sent_$sig"} &&
1008 time - $proc{$_}->{"sent_$sig"} > 4)
1010 $sig = 9; # SIGKILL if SIGTERM doesn't work
1012 if (!exists $proc{$_}->{"sent_$sig"})
1014 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1016 select (undef, undef, undef, 0.1);
1019 kill $sig, $_; # srun wants two SIGINT to really interrupt
1021 $proc{$_}->{"sent_$sig"} = time;
1022 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1032 vec($bits,fileno($_),1) = 1;
1038 sub Log # ($jobstep_id, $logmessage)
1040 if ($_[1] =~ /\n/) {
1041 for my $line (split (/\n/, $_[1])) {
1046 my $fh = select STDERR; $|=1; select $fh;
1047 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1048 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1051 if ($metastream || -t STDERR) {
1052 my @gmtime = gmtime;
1053 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1054 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1056 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1058 return if !$metastream;
1059 $metastream->write_data ($datetime . " " . $message);
1063 sub reconnect_database
1065 return if !$job_has_uuid;
1066 return if ($dbh && $dbh->do ("select now()"));
1069 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1071 $dbh->{InactiveDestroy} = 1;
1074 warn ($DBI::errstr);
1077 croak ($DBI::errstr) if !$dbh;
1083 return 1 if !$job_has_uuid;
1084 my $ret = $dbh->do (@_);
1085 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1086 reconnect_database();
1087 return $dbh->do (@_);
1093 my ($package, $file, $line) = caller;
1094 my $message = "@_ at $file line $line\n";
1095 Log (undef, $message);
1096 freeze() if @jobstep_todo;
1097 collate_output() if @jobstep_todo;
1099 save_meta() if $metastream;
1106 return if !$job_has_uuid;
1108 $Job->{'running'} = 0;
1109 $Job->{'success'} = 0;
1110 $Job->{'finished_at'} = time;
1117 my $justcheckpoint = shift; # false if this will be the last meta saved
1118 my $m = $metastream;
1119 $m = $m->copy if $justcheckpoint;
1121 my $loglocator = $m->as_key;
1122 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1123 Log (undef, "meta key is $loglocator");
1124 $Job->{'log'} = $loglocator;
1129 sub freeze_if_want_freeze
1131 if ($main::please_freeze)
1133 release_allocation();
1136 # kill some srun procs before freeze+stop
1137 map { $proc{$_} = {} } @_;
1140 killem (keys %proc);
1141 select (undef, undef, undef, 0.1);
1143 while (($died = waitpid (-1, WNOHANG)) > 0)
1145 delete $proc{$died};
1160 Log (undef, "Freeze not implemented");
1164 Log (undef, "freeze");
1166 my $freezer = new Warehouse::Stream (whc => $whc);
1168 $freezer->name (".");
1169 $freezer->write_start ("state.txt");
1171 $freezer->write_data (join ("\n",
1175 $_ . "=" . freezequote($Job->{$_})
1176 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1178 foreach my $Jobstep (@jobstep)
1180 my $str = join ("\n",
1183 $_ . "=" . freezequote ($Jobstep->{$_})
1185 $_ !~ /^stderr|slotindex|node_fail/
1187 $freezer->write_data ($str."\n\n");
1189 if (@jobstep_tomerge)
1191 $freezer->write_data
1192 ("merge $jobstep_tomerge_level "
1193 . freezequote (join ("\n",
1194 map { freezequote ($_) } @jobstep_tomerge))
1198 $freezer->write_finish;
1199 my $frozentokey = $freezer->as_key;
1201 Log (undef, "frozento key is $frozentokey");
1202 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1203 $frozentokey, $job_id);
1204 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1205 Log (undef, "frozento+K key is $kfrozentokey");
1206 return $frozentokey;
1212 croak ("Thaw not implemented");
1216 Log (undef, "thaw from $key");
1221 @jobstep_tomerge = ();
1222 $jobstep_tomerge_level = 0;
1225 my $stream = new Warehouse::Stream ( whc => $whc,
1226 hash => [split (",", $key)] );
1228 while (my $dataref = $stream->read_until (undef, "\n\n"))
1230 if ($$dataref =~ /^job /)
1232 foreach (split ("\n", $$dataref))
1234 my ($k, $v) = split ("=", $_, 2);
1235 $frozenjob->{$k} = freezeunquote ($v);
1240 if ($$dataref =~ /^merge (\d+) (.*)/)
1242 $jobstep_tomerge_level = $1;
1244 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1249 foreach (split ("\n", $$dataref))
1251 my ($k, $v) = split ("=", $_, 2);
1252 $Jobstep->{$k} = freezeunquote ($v) if $k;
1254 $Jobstep->{attempts} = 0;
1255 push @jobstep, $Jobstep;
1257 if ($Jobstep->{exitcode} eq "0")
1259 push @jobstep_done, $#jobstep;
1263 push @jobstep_todo, $#jobstep;
1267 foreach (qw (script script_version script_parameters))
1269 $Job->{$_} = $frozenjob->{$_};
1287 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1294 my $srunargs = shift;
1295 my $execargs = shift;
1296 my $opts = shift || {};
1298 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1299 print STDERR (join (" ",
1300 map { / / ? "'$_'" : $_ }
1303 if $ENV{CRUNCH_DEBUG};
1305 if (defined $stdin) {
1306 my $child = open STDIN, "-|";
1307 defined $child or die "no fork: $!";
1309 print $stdin or die $!;
1310 close STDOUT or die $!;
1315 return system (@$args) if $opts->{fork};
1318 warn "ENV size is ".length(join(" ",%ENV));
1319 die "exec failed: $!: @$args";
1323 sub ban_node_by_slot {
1324 # Don't start any new jobsteps on this node for 60 seconds
1326 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1327 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1333 # checkout-and-build
1337 my $destdir = $ENV{"CRUNCH_SRC"};
1338 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1339 my $repo = $ENV{"CRUNCH_SRC_URL"};
1341 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1343 if (readlink ("$destdir.commit") eq $commit) {
1347 open STDOUT, ">", "$destdir.log";
1348 open STDERR, ">&STDOUT";
1350 if (-d "$destdir/.git") {
1351 chdir $destdir or die "chdir $destdir: $!";
1352 if (0 != system (qw(git remote set-url origin), $repo)) {
1353 # awful... for old versions of git that don't know "remote set-url"
1354 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1357 elsif ($repo && $commit)
1359 shell_or_die('git', 'clone', $repo, $destdir);
1360 chdir $destdir or die "chdir $destdir: $!";
1361 shell_or_die(qw(git config clean.requireForce false));
1364 die "$destdir does not exist, and no repo/commit specified -- giving up";
1368 unlink "$destdir.commit";
1369 shell_or_die (qw(git stash));
1370 shell_or_die (qw(git clean -d -x));
1371 shell_or_die (qw(git fetch origin));
1372 shell_or_die (qw(git checkout), $commit);
1376 chomp ($pwd = `pwd`);
1377 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1379 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1381 shell_or_die ("./tests/autotests.sh", $install_dir);
1382 } elsif (-e "./install.sh") {
1383 shell_or_die ("./install.sh", $install_dir);
1387 unlink "$destdir.commit.new";
1388 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1389 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1398 if ($ENV{"DEBUG"}) {
1399 print STDERR "@_\n";
1402 or die "@_ failed: $! exit 0x".sprintf("%x",$?);