2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 $main::ENV{CRUNCH_DEBUG} = 1;
112 $main::ENV{CRUNCH_DEBUG} = 0;
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new;
120 $metastream->write_start('log.txt');
129 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130 $User = $arv->{'users'}->{'current'}->execute;
131 if (!$force_unlock) {
132 if ($Job->{'is_locked_by'}) {
133 croak("Job is locked: " . $Job->{'is_locked_by'});
135 if ($Job->{'success'} ne undef) {
136 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138 if ($Job->{'running'}) {
139 croak("Job 'running' flag is already set");
141 if ($Job->{'started_at'}) {
142 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148 $Job = JSON::decode_json($jobspec);
152 map { croak ("No $_ specified") unless $Job->{$_} }
153 qw(script script_version script_parameters);
156 if (!defined $Job->{'uuid'}) {
157 chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
160 $job_id = $Job->{'uuid'};
164 $Job->{'resource_limits'} ||= {};
165 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
166 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
169 Log (undef, "check slurm allocation");
172 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
176 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
177 push @sinfo, "$localcpus localhost";
179 if (exists $ENV{SLURM_NODELIST})
181 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
185 my ($ncpus, $slurm_nodelist) = split;
186 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
189 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
192 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
195 foreach (split (",", $ranges))
208 push @nodelist, map {
210 $n =~ s/\[[-,\d]+\]/$_/;
217 push @nodelist, $nodelist;
220 foreach my $nodename (@nodelist)
222 Log (undef, "node $nodename - $ncpus slots");
223 my $node = { name => $nodename,
227 foreach my $cpu (1..$ncpus)
229 push @slot, { node => $node,
233 push @node, @nodelist;
238 # Ensure that we get one jobstep running on each allocated node before
239 # we start overloading nodes with concurrent steps
241 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
248 # Claim this job, and make sure nobody else does
250 $Job->{'is_locked_by'} = $User->{'uuid'};
251 $Job->{'started_at'} = gmtime;
252 $Job->{'running'} = 1;
253 $Job->{'success'} = undef;
254 $Job->{'tasks_summary'} = { 'failed' => 0,
258 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
259 croak("Error while updating / locking job");
264 Log (undef, "start");
265 $SIG{'INT'} = sub { $main::please_freeze = 1; };
266 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
267 $SIG{'TERM'} = \&croak;
268 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
269 $SIG{'ALRM'} = sub { $main::please_info = 1; };
270 $SIG{'CONT'} = sub { $main::please_continue = 1; };
271 $main::please_freeze = 0;
272 $main::please_info = 0;
273 $main::please_continue = 0;
274 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
276 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
277 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
278 $ENV{"JOB_UUID"} = $job_id;
282 my @jobstep_todo = ();
283 my @jobstep_done = ();
284 my @jobstep_tomerge = ();
285 my $jobstep_tomerge_level = 0;
287 my $squeue_kill_checked;
288 my $output_in_keep = 0;
292 if (defined $Job->{thawedfromkey})
294 thaw ($Job->{thawedfromkey});
298 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
299 'job_uuid' => $Job->{'uuid'},
304 push @jobstep, { 'level' => 0,
306 'arvados_task' => $first_task,
308 push @jobstep_todo, 0;
315 $build_script = <DATA>;
319 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
321 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
324 $ENV{"CRUNCH_SRC"} = $Job->{revision};
328 Log (undef, "Install revision ".$Job->{revision});
329 my $nodelist = join(",", @node);
331 # Clean out crunch_tmp/work and crunch_tmp/opt
333 my $cleanpid = fork();
336 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
337 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
342 last if $cleanpid == waitpid (-1, WNOHANG);
343 freeze_if_want_freeze ($cleanpid);
344 select (undef, undef, undef, 0.1);
346 Log (undef, "Clean-work-dir exited $?");
348 # Install requested code version
351 my @srunargs = ("srun",
352 "--nodelist=$nodelist",
353 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
355 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
356 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
357 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
360 my $treeish = $Job->{'script_version'};
361 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
362 # Todo: let script_version specify repository instead of expecting
363 # parent process to figure it out.
364 $ENV{"CRUNCH_SRC_URL"} = $repo;
366 # Create/update our clone of the remote git repo
368 if (!-d $ENV{"CRUNCH_SRC"}) {
369 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
370 or croak ("git clone $repo failed: exit ".($?>>8));
371 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
373 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
375 # If this looks like a subversion r#, look for it in git-svn commit messages
377 if ($treeish =~ m{^\d{1,4}$}) {
378 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
380 if ($gitlog =~ /^[a-f0-9]{40}$/) {
382 Log (undef, "Using commit $commit for revision $treeish");
386 # If that didn't work, try asking git to look it up as a tree-ish.
388 if (!defined $commit) {
390 my $cooked_treeish = $treeish;
391 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
392 # Looks like a git branch name -- make sure git knows it's
393 # relative to the remote repo
394 $cooked_treeish = "origin/$treeish";
397 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
399 if ($found =~ /^[0-9a-f]{40}$/s) {
401 if ($commit ne $treeish) {
402 # Make sure we record the real commit id in the database,
403 # frozentokey, logs, etc. -- instead of an abbreviation or a
404 # branch name which can become ambiguous or point to a
405 # different commit in the future.
406 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
407 Log (undef, "Using commit $commit for tree-ish $treeish");
408 if ($commit ne $treeish) {
409 $Job->{'script_version'} = $commit;
410 $Job->save() or croak("Error while updating job");
416 if (defined $commit) {
417 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
418 @execargs = ("sh", "-c",
419 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
422 croak ("could not figure out commit id for $treeish");
425 my $installpid = fork();
426 if ($installpid == 0)
428 srun (\@srunargs, \@execargs, {}, $build_script);
433 last if $installpid == waitpid (-1, WNOHANG);
434 freeze_if_want_freeze ($installpid);
435 select (undef, undef, undef, 0.1);
437 Log (undef, "Install exited $?");
442 foreach (qw (script script_version script_parameters resource_limits))
446 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
448 foreach (split (/\n/, $Job->{knobs}))
450 Log (undef, "knob " . $_);
461 my $thisround_succeeded = 0;
462 my $thisround_failed = 0;
463 my $thisround_failed_multiple = 0;
465 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
466 or $a <=> $b } @jobstep_todo;
467 my $level = $jobstep[$jobstep_todo[0]]->{level};
468 Log (undef, "start level $level");
473 my @freeslot = (0..$#slot);
476 my $progress_is_dirty = 1;
477 my $progress_stats_updated = 0;
479 update_progress_stats();
484 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
486 $main::please_continue = 0;
488 my $id = $jobstep_todo[$todo_ptr];
489 my $Jobstep = $jobstep[$id];
490 if ($Jobstep->{level} != $level)
494 if ($Jobstep->{attempts} > 9)
496 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
501 pipe $reader{$id}, "writer" or croak ($!);
502 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
503 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
505 my $childslot = $freeslot[0];
506 my $childnode = $slot[$childslot]->{node};
507 my $childslotname = join (".",
508 $slot[$childslot]->{node}->{name},
509 $slot[$childslot]->{cpu});
510 my $childpid = fork();
513 $SIG{'INT'} = 'DEFAULT';
514 $SIG{'QUIT'} = 'DEFAULT';
515 $SIG{'TERM'} = 'DEFAULT';
517 foreach (values (%reader))
521 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
522 open(STDOUT,">&writer");
523 open(STDERR,">&writer");
528 delete $ENV{"GNUPGHOME"};
529 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
530 $ENV{"TASK_QSEQUENCE"} = $id;
531 $ENV{"TASK_SEQUENCE"} = $level;
532 $ENV{"JOB_SCRIPT"} = $Job->{script};
533 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
534 $param =~ tr/a-z/A-Z/;
535 $ENV{"JOB_PARAMETER_$param"} = $value;
537 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
538 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
539 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
540 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
546 "--nodelist=".$childnode->{name},
547 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
548 "--job-name=$job_id.$id.$$",
550 my @execargs = qw(sh);
551 my $build_script_to_send = "";
553 "mkdir -p $ENV{CRUNCH_TMP}/revision "
554 ."&& cd $ENV{CRUNCH_TMP} ";
557 $build_script_to_send = $build_script;
561 elsif (!$skip_install)
566 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
568 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
569 ." && ./installrevision "
573 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
575 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
576 my @execargs = ('bash', '-c', $command);
577 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
581 if (!defined $childpid)
588 $proc{$childpid} = { jobstep => $id,
591 jobstepname => "$job_id.$id.$childpid",
593 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
594 $slot[$childslot]->{pid} = $childpid;
596 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
597 Log ($id, "child $childpid started on $childslotname");
598 $Jobstep->{attempts} ++;
599 $Jobstep->{starttime} = time;
600 $Jobstep->{node} = $childnode->{name};
601 $Jobstep->{slotindex} = $childslot;
602 delete $Jobstep->{stderr};
603 delete $Jobstep->{finishtime};
605 splice @jobstep_todo, $todo_ptr, 1;
608 $progress_is_dirty = 1;
612 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
614 last THISROUND if $main::please_freeze;
615 if ($main::please_info)
617 $main::please_info = 0;
621 update_progress_stats();
629 update_progress_stats();
630 select (undef, undef, undef, 0.1);
632 elsif (time - $progress_stats_updated >= 30)
634 update_progress_stats();
636 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
637 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
639 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
640 .($thisround_failed+$thisround_succeeded)
641 .") -- giving up on this round";
642 Log (undef, $message);
646 # move slots from freeslot to holdslot (or back to freeslot) if necessary
647 for (my $i=$#freeslot; $i>=0; $i--) {
648 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
649 push @holdslot, (splice @freeslot, $i, 1);
652 for (my $i=$#holdslot; $i>=0; $i--) {
653 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
654 push @freeslot, (splice @holdslot, $i, 1);
658 # give up if no nodes are succeeding
659 if (!grep { $_->{node}->{losing_streak} == 0 &&
660 $_->{node}->{hold_count} < 4 } @slot) {
661 my $message = "Every node has failed -- giving up on this round";
662 Log (undef, $message);
669 push @freeslot, splice @holdslot;
670 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
673 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
676 goto THISROUND if $main::please_continue;
677 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
682 update_progress_stats();
683 select (undef, undef, undef, 0.1);
684 killem (keys %proc) if $main::please_freeze;
688 update_progress_stats();
689 freeze_if_want_freeze();
692 if (!defined $success)
695 $thisround_succeeded == 0 &&
696 ($thisround_failed == 0 || $thisround_failed > 4))
698 my $message = "stop because $thisround_failed tasks failed and none succeeded";
699 Log (undef, $message);
708 goto ONELEVEL if !defined $success;
711 release_allocation();
713 $Job->{'output'} = &collate_output();
714 $Job->{'success'} = $Job->{'output'} && $success;
717 if ($Job->{'output'})
720 my $manifest_text = capturex("whget", $Job->{'output'});
721 $arv->{'collections'}->{'create'}->execute('collection' => {
722 'uuid' => $Job->{'output'},
723 'manifest_text' => $manifest_text,
727 Log (undef, "Failed to register output manifest: $@");
731 Log (undef, "finish");
738 sub update_progress_stats
740 $progress_stats_updated = time;
741 return if !$progress_is_dirty;
742 my ($todo, $done, $running) = (scalar @jobstep_todo,
743 scalar @jobstep_done,
744 scalar @slot - scalar @freeslot - scalar @holdslot);
745 $Job->{'tasks_summary'} ||= {};
746 $Job->{'tasks_summary'}->{'todo'} = $todo;
747 $Job->{'tasks_summary'}->{'done'} = $done;
748 $Job->{'tasks_summary'}->{'running'} = $running;
750 Log (undef, "status: $done done, $running running, $todo todo");
751 $progress_is_dirty = 0;
758 my $pid = waitpid (-1, WNOHANG);
759 return 0 if $pid <= 0;
761 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
763 . $slot[$proc{$pid}->{slot}]->{cpu});
764 my $jobstepid = $proc{$pid}->{jobstep};
765 my $elapsed = time - $proc{$pid}->{time};
766 my $Jobstep = $jobstep[$jobstepid];
769 my $exitinfo = "exit $exitcode";
770 $Jobstep->{'arvados_task'}->reload;
771 my $success = $Jobstep->{'arvados_task'}->{success};
773 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
775 if (!defined $success) {
776 # task did not indicate one way or the other --> fail
777 $Jobstep->{'arvados_task'}->{success} = 0;
778 $Jobstep->{'arvados_task'}->save;
784 my $no_incr_attempts;
785 $no_incr_attempts = 1 if $Jobstep->{node_fail};
788 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
790 # Check for signs of a failed or misconfigured node
791 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
792 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
793 # Don't count this against jobstep failure thresholds if this
794 # node is already suspected faulty and srun exited quickly
795 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
797 $Jobstep->{attempts} > 1) {
798 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
799 $no_incr_attempts = 1;
800 --$Jobstep->{attempts};
802 ban_node_by_slot($proc{$pid}->{slot});
805 push @jobstep_todo, $jobstepid;
806 Log ($jobstepid, "failure in $elapsed seconds");
808 --$Jobstep->{attempts} if $no_incr_attempts;
809 $Job->{'tasks_summary'}->{'failed'}++;
813 ++$thisround_succeeded;
814 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
815 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
816 push @jobstep_done, $jobstepid;
817 Log ($jobstepid, "success in $elapsed seconds");
819 $Jobstep->{exitcode} = $exitcode;
820 $Jobstep->{finishtime} = time;
821 process_stderr ($jobstepid, $success);
822 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
824 close $reader{$jobstepid};
825 delete $reader{$jobstepid};
826 delete $slot[$proc{$pid}->{slot}]->{pid};
827 push @freeslot, $proc{$pid}->{slot};
831 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
833 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
835 'order' => 'qsequence'
837 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
839 'level' => $arvados_task->{'sequence'},
841 'arvados_task' => $arvados_task
843 push @jobstep, $jobstep;
844 push @jobstep_todo, $#jobstep;
847 $progress_is_dirty = 1;
854 # return if the kill list was checked <4 seconds ago
855 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
859 $squeue_kill_checked = time;
861 # use killem() on procs whose killtime is reached
864 if (exists $proc{$_}->{killtime}
865 && $proc{$_}->{killtime} <= time)
871 # return if the squeue was checked <60 seconds ago
872 if (defined $squeue_checked && $squeue_checked > time - 60)
876 $squeue_checked = time;
880 # here is an opportunity to check for mysterious problems with local procs
884 # get a list of steps still running
885 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
887 if ($squeue[-1] ne "ok")
893 # which of my jobsteps are running, according to squeue?
897 if (/^(\d+)\.(\d+) (\S+)/)
899 if ($1 eq $ENV{SLURM_JOBID})
906 # which of my active child procs (>60s old) were not mentioned by squeue?
909 if ($proc{$_}->{time} < time - 60
910 && !exists $ok{$proc{$_}->{jobstepname}}
911 && !exists $proc{$_}->{killtime})
913 # kill this proc if it hasn't exited in 30 seconds
914 $proc{$_}->{killtime} = time + 30;
920 sub release_allocation
924 Log (undef, "release job allocation");
925 system "scancel $ENV{SLURM_JOBID}";
933 foreach my $job (keys %reader)
936 while (0 < sysread ($reader{$job}, $buf, 8192))
938 print STDERR $buf if $ENV{CRUNCH_DEBUG};
939 $jobstep[$job]->{stderr} .= $buf;
940 preprocess_stderr ($job);
941 if (length ($jobstep[$job]->{stderr}) > 16384)
943 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
952 sub preprocess_stderr
956 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
958 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
959 Log ($job, "stderr $line");
960 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
962 $main::please_freeze = 1;
964 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
965 $jobstep[$job]->{node_fail} = 1;
966 ban_node_by_slot($jobstep[$job]->{slotindex});
976 preprocess_stderr ($job);
979 Log ($job, "stderr $_");
980 } split ("\n", $jobstep[$job]->{stderr});
986 my $whc = Warehouse->new;
987 Log (undef, "collate");
988 $whc->write_start (1);
992 next if (!exists $_->{'arvados_task'}->{output} ||
993 !$_->{'arvados_task'}->{'success'} ||
994 $_->{'exitcode'} != 0);
995 my $output = $_->{'arvados_task'}->{output};
996 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
998 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
999 $whc->write_data ($output);
1001 elsif (@jobstep == 1)
1003 $joboutput = $output;
1006 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1008 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1009 $whc->write_data ($outblock);
1013 my $errstr = $whc->errstr;
1014 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1018 $joboutput = $whc->write_finish if !defined $joboutput;
1021 Log (undef, "output $joboutput");
1022 $Job->{'output'} = $joboutput;
1027 Log (undef, "output undef");
1037 my $sig = 2; # SIGINT first
1038 if (exists $proc{$_}->{"sent_$sig"} &&
1039 time - $proc{$_}->{"sent_$sig"} > 4)
1041 $sig = 15; # SIGTERM if SIGINT doesn't work
1043 if (exists $proc{$_}->{"sent_$sig"} &&
1044 time - $proc{$_}->{"sent_$sig"} > 4)
1046 $sig = 9; # SIGKILL if SIGTERM doesn't work
1048 if (!exists $proc{$_}->{"sent_$sig"})
1050 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1052 select (undef, undef, undef, 0.1);
1055 kill $sig, $_; # srun wants two SIGINT to really interrupt
1057 $proc{$_}->{"sent_$sig"} = time;
1058 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1068 vec($bits,fileno($_),1) = 1;
1074 sub Log # ($jobstep_id, $logmessage)
1076 if ($_[1] =~ /\n/) {
1077 for my $line (split (/\n/, $_[1])) {
1082 my $fh = select STDERR; $|=1; select $fh;
1083 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1084 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1087 if ($metastream || -t STDERR) {
1088 my @gmtime = gmtime;
1089 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1090 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1092 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1094 return if !$metastream;
1095 $metastream->write_data ($datetime . " " . $message);
1101 my ($package, $file, $line) = caller;
1102 my $message = "@_ at $file line $line\n";
1103 Log (undef, $message);
1104 freeze() if @jobstep_todo;
1105 collate_output() if @jobstep_todo;
1107 save_meta() if $metastream;
1114 return if !$job_has_uuid;
1116 $Job->{'running'} = 0;
1117 $Job->{'success'} = 0;
1118 $Job->{'finished_at'} = gmtime;
1125 my $justcheckpoint = shift; # false if this will be the last meta saved
1126 my $m = $metastream;
1127 $m = $m->copy if $justcheckpoint;
1129 my $loglocator = $m->as_key;
1130 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1131 Log (undef, "meta key is $loglocator");
1132 $Job->{'log'} = $loglocator;
1137 sub freeze_if_want_freeze
1139 if ($main::please_freeze)
1141 release_allocation();
1144 # kill some srun procs before freeze+stop
1145 map { $proc{$_} = {} } @_;
1148 killem (keys %proc);
1149 select (undef, undef, undef, 0.1);
1151 while (($died = waitpid (-1, WNOHANG)) > 0)
1153 delete $proc{$died};
1168 Log (undef, "Freeze not implemented");
1175 croak ("Thaw not implemented");
1179 Log (undef, "thaw from $key");
1184 @jobstep_tomerge = ();
1185 $jobstep_tomerge_level = 0;
1188 my $stream = new Warehouse::Stream ( whc => $whc,
1189 hash => [split (",", $key)] );
1191 while (my $dataref = $stream->read_until (undef, "\n\n"))
1193 if ($$dataref =~ /^job /)
1195 foreach (split ("\n", $$dataref))
1197 my ($k, $v) = split ("=", $_, 2);
1198 $frozenjob->{$k} = freezeunquote ($v);
1203 if ($$dataref =~ /^merge (\d+) (.*)/)
1205 $jobstep_tomerge_level = $1;
1207 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1212 foreach (split ("\n", $$dataref))
1214 my ($k, $v) = split ("=", $_, 2);
1215 $Jobstep->{$k} = freezeunquote ($v) if $k;
1217 $Jobstep->{attempts} = 0;
1218 push @jobstep, $Jobstep;
1220 if ($Jobstep->{exitcode} eq "0")
1222 push @jobstep_done, $#jobstep;
1226 push @jobstep_todo, $#jobstep;
1230 foreach (qw (script script_version script_parameters))
1232 $Job->{$_} = $frozenjob->{$_};
1250 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1257 my $srunargs = shift;
1258 my $execargs = shift;
1259 my $opts = shift || {};
1261 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1262 print STDERR (join (" ",
1263 map { / / ? "'$_'" : $_ }
1266 if $ENV{CRUNCH_DEBUG};
1268 if (defined $stdin) {
1269 my $child = open STDIN, "-|";
1270 defined $child or die "no fork: $!";
1272 print $stdin or die $!;
1273 close STDOUT or die $!;
1278 return system (@$args) if $opts->{fork};
1281 warn "ENV size is ".length(join(" ",%ENV));
1282 die "exec failed: $!: @$args";
1286 sub ban_node_by_slot {
1287 # Don't start any new jobsteps on this node for 60 seconds
1289 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1290 $slot[$slotid]->{node}->{hold_count}++;
1291 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1297 # checkout-and-build
1301 my $destdir = $ENV{"CRUNCH_SRC"};
1302 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1303 my $repo = $ENV{"CRUNCH_SRC_URL"};
1305 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1307 if (readlink ("$destdir.commit") eq $commit) {
1311 open STDOUT, ">", "$destdir.log";
1312 open STDERR, ">&STDOUT";
1314 if (-d "$destdir/.git") {
1315 chdir $destdir or die "chdir $destdir: $!";
1316 if (0 != system (qw(git remote set-url origin), $repo)) {
1317 # awful... for old versions of git that don't know "remote set-url"
1318 shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1321 elsif ($repo && $commit)
1323 shell_or_die('git', 'clone', $repo, $destdir);
1324 chdir $destdir or die "chdir $destdir: $!";
1325 shell_or_die(qw(git config clean.requireForce false));
1328 die "$destdir does not exist, and no repo/commit specified -- giving up";
1332 unlink "$destdir.commit";
1333 shell_or_die (qw(git stash));
1334 shell_or_die (qw(git clean -d -x));
1335 shell_or_die (qw(git fetch origin));
1336 shell_or_die (qw(git checkout), $commit);
1340 chomp ($pwd = `pwd`);
1341 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1343 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1345 shell_or_die ("./tests/autotests.sh", $install_dir);
1346 } elsif (-e "./install.sh") {
1347 shell_or_die ("./install.sh", $install_dir);
1351 unlink "$destdir.commit.new";
1352 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1353 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1362 if ($ENV{"DEBUG"}) {
1363 print STDERR "@_\n";
1366 or die "@_ failed: $! exit 0x".sprintf("%x",$?);