2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 $main::ENV{CRUNCH_DEBUG} = 1;
112 $main::ENV{CRUNCH_DEBUG} = 0;
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->write_start('log.txt');
129 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130 $User = $arv->{'users'}->{'current'}->execute;
131 if (!$force_unlock) {
132 if ($Job->{'is_locked_by'}) {
133 croak("Job is locked: " . $Job->{'is_locked_by'});
135 if ($Job->{'success'} ne undef) {
136 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138 if ($Job->{'running'}) {
139 croak("Job 'running' flag is already set");
141 if ($Job->{'started_at'}) {
142 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148 $Job = JSON::decode_json($jobspec);
152 map { croak ("No $_ specified") unless $Job->{$_} }
153 qw(script script_version script_parameters);
156 if (!defined $Job->{'uuid'}) {
157 my $hostname = `hostname -s`;
159 $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
162 $job_id = $Job->{'uuid'};
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
171 Log (undef, "check slurm allocation");
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
178 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179 push @sinfo, "$localcpus localhost";
181 if (exists $ENV{SLURM_NODELIST})
183 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
187 my ($ncpus, $slurm_nodelist) = split;
188 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
191 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
194 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
197 foreach (split (",", $ranges))
210 push @nodelist, map {
212 $n =~ s/\[[-,\d]+\]/$_/;
219 push @nodelist, $nodelist;
222 foreach my $nodename (@nodelist)
224 Log (undef, "node $nodename - $ncpus slots");
225 my $node = { name => $nodename,
229 foreach my $cpu (1..$ncpus)
231 push @slot, { node => $node,
235 push @node, @nodelist;
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250 # Claim this job, and make sure nobody else does
252 $Job->{'is_locked_by'} = $User->{'uuid'};
253 $Job->{'started_at'} = gmtime;
254 $Job->{'running'} = 1;
255 $Job->{'success'} = undef;
256 $Job->{'tasks_summary'} = { 'failed' => 0,
261 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
262 croak("Error while updating / locking job");
268 Log (undef, "start");
269 $SIG{'INT'} = sub { $main::please_freeze = 1; };
270 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
271 $SIG{'TERM'} = \&croak;
272 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
273 $SIG{'ALRM'} = sub { $main::please_info = 1; };
274 $SIG{'CONT'} = sub { $main::please_continue = 1; };
275 $main::please_freeze = 0;
276 $main::please_info = 0;
277 $main::please_continue = 0;
278 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
280 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
281 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
282 $ENV{"JOB_UUID"} = $job_id;
286 my @jobstep_todo = ();
287 my @jobstep_done = ();
288 my @jobstep_tomerge = ();
289 my $jobstep_tomerge_level = 0;
291 my $squeue_kill_checked;
292 my $output_in_keep = 0;
296 if (defined $Job->{thawedfromkey})
298 thaw ($Job->{thawedfromkey});
302 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
303 'job_uuid' => $Job->{'uuid'},
308 push @jobstep, { 'level' => 0,
310 'arvados_task' => $first_task,
312 push @jobstep_todo, 0;
319 $build_script = <DATA>;
323 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
325 my $skip_install = (!$job_has_uuid && $Job->{script_version} =~ m{^/});
328 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 Log (undef, "Install revision ".$Job->{script_version});
333 my $nodelist = join(",", @node);
335 # Clean out crunch_tmp/work and crunch_tmp/opt
337 my $cleanpid = fork();
340 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
341 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
346 last if $cleanpid == waitpid (-1, WNOHANG);
347 freeze_if_want_freeze ($cleanpid);
348 select (undef, undef, undef, 0.1);
350 Log (undef, "Clean-work-dir exited $?");
352 # Install requested code version
355 my @srunargs = ("srun",
356 "--nodelist=$nodelist",
357 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
359 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
361 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
365 my $treeish = $Job->{'script_version'};
366 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
367 # Todo: let script_version specify repository instead of expecting
368 # parent process to figure it out.
369 $ENV{"CRUNCH_SRC_URL"} = $repo;
371 # Create/update our clone of the remote git repo
373 if (!-d $ENV{"CRUNCH_SRC"}) {
374 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
375 or croak ("git clone $repo failed: exit ".($?>>8));
376 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
378 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
380 # If this looks like a subversion r#, look for it in git-svn commit messages
382 if ($treeish =~ m{^\d{1,4}$}) {
383 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
385 if ($gitlog =~ /^[a-f0-9]{40}$/) {
387 Log (undef, "Using commit $commit for script_version $treeish");
391 # If that didn't work, try asking git to look it up as a tree-ish.
393 if (!defined $commit) {
395 my $cooked_treeish = $treeish;
396 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
397 # Looks like a git branch name -- make sure git knows it's
398 # relative to the remote repo
399 $cooked_treeish = "origin/$treeish";
402 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
404 if ($found =~ /^[0-9a-f]{40}$/s) {
406 if ($commit ne $treeish) {
407 # Make sure we record the real commit id in the database,
408 # frozentokey, logs, etc. -- instead of an abbreviation or a
409 # branch name which can become ambiguous or point to a
410 # different commit in the future.
411 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
412 Log (undef, "Using commit $commit for tree-ish $treeish");
413 if ($commit ne $treeish) {
414 $Job->{'script_version'} = $commit;
415 !$job_has_uuid or $Job->save() or croak("Error while updating job");
421 if (defined $commit) {
422 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
423 @execargs = ("sh", "-c",
424 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
425 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
428 croak ("could not figure out commit id for $treeish");
431 my $installpid = fork();
432 if ($installpid == 0)
434 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
439 last if $installpid == waitpid (-1, WNOHANG);
440 freeze_if_want_freeze ($installpid);
441 select (undef, undef, undef, 0.1);
443 Log (undef, "Install exited $?");
448 foreach (qw (script script_version script_parameters resource_limits))
452 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
454 foreach (split (/\n/, $Job->{knobs}))
456 Log (undef, "knob " . $_);
467 my $thisround_succeeded = 0;
468 my $thisround_failed = 0;
469 my $thisround_failed_multiple = 0;
471 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
472 or $a <=> $b } @jobstep_todo;
473 my $level = $jobstep[$jobstep_todo[0]]->{level};
474 Log (undef, "start level $level");
479 my @freeslot = (0..$#slot);
482 my $progress_is_dirty = 1;
483 my $progress_stats_updated = 0;
485 update_progress_stats();
490 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
492 $main::please_continue = 0;
494 my $id = $jobstep_todo[$todo_ptr];
495 my $Jobstep = $jobstep[$id];
496 if ($Jobstep->{level} != $level)
500 if ($Jobstep->{attempts} > 9)
502 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
507 pipe $reader{$id}, "writer" or croak ($!);
508 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
511 my $childslot = $freeslot[0];
512 my $childnode = $slot[$childslot]->{node};
513 my $childslotname = join (".",
514 $slot[$childslot]->{node}->{name},
515 $slot[$childslot]->{cpu});
516 my $childpid = fork();
519 $SIG{'INT'} = 'DEFAULT';
520 $SIG{'QUIT'} = 'DEFAULT';
521 $SIG{'TERM'} = 'DEFAULT';
523 foreach (values (%reader))
527 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528 open(STDOUT,">&writer");
529 open(STDERR,">&writer");
534 delete $ENV{"GNUPGHOME"};
535 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536 $ENV{"TASK_QSEQUENCE"} = $id;
537 $ENV{"TASK_SEQUENCE"} = $level;
538 $ENV{"JOB_SCRIPT"} = $Job->{script};
539 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540 $param =~ tr/a-z/A-Z/;
541 $ENV{"JOB_PARAMETER_$param"} = $value;
543 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
546 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
552 "--nodelist=".$childnode->{name},
553 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
554 "--job-name=$job_id.$id.$$",
556 my @execargs = qw(sh);
557 my $build_script_to_send = "";
559 "mkdir -p $ENV{CRUNCH_TMP}/revision "
560 ."&& cd $ENV{CRUNCH_TMP} ";
563 $build_script_to_send = $build_script;
567 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
569 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
570 my @execargs = ('bash', '-c', $command);
571 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
575 if (!defined $childpid)
582 $proc{$childpid} = { jobstep => $id,
585 jobstepname => "$job_id.$id.$childpid",
587 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
588 $slot[$childslot]->{pid} = $childpid;
590 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
591 Log ($id, "child $childpid started on $childslotname");
592 $Jobstep->{attempts} ++;
593 $Jobstep->{starttime} = time;
594 $Jobstep->{node} = $childnode->{name};
595 $Jobstep->{slotindex} = $childslot;
596 delete $Jobstep->{stderr};
597 delete $Jobstep->{finishtime};
599 splice @jobstep_todo, $todo_ptr, 1;
602 $progress_is_dirty = 1;
606 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
608 last THISROUND if $main::please_freeze;
609 if ($main::please_info)
611 $main::please_info = 0;
615 update_progress_stats();
623 update_progress_stats();
624 select (undef, undef, undef, 0.1);
626 elsif (time - $progress_stats_updated >= 30)
628 update_progress_stats();
630 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
631 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
633 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
634 .($thisround_failed+$thisround_succeeded)
635 .") -- giving up on this round";
636 Log (undef, $message);
640 # move slots from freeslot to holdslot (or back to freeslot) if necessary
641 for (my $i=$#freeslot; $i>=0; $i--) {
642 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
643 push @holdslot, (splice @freeslot, $i, 1);
646 for (my $i=$#holdslot; $i>=0; $i--) {
647 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
648 push @freeslot, (splice @holdslot, $i, 1);
652 # give up if no nodes are succeeding
653 if (!grep { $_->{node}->{losing_streak} == 0 &&
654 $_->{node}->{hold_count} < 4 } @slot) {
655 my $message = "Every node has failed -- giving up on this round";
656 Log (undef, $message);
663 push @freeslot, splice @holdslot;
664 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
667 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
670 goto THISROUND if $main::please_continue;
671 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
676 update_progress_stats();
677 select (undef, undef, undef, 0.1);
678 killem (keys %proc) if $main::please_freeze;
682 update_progress_stats();
683 freeze_if_want_freeze();
686 if (!defined $success)
689 $thisround_succeeded == 0 &&
690 ($thisround_failed == 0 || $thisround_failed > 4))
692 my $message = "stop because $thisround_failed tasks failed and none succeeded";
693 Log (undef, $message);
702 goto ONELEVEL if !defined $success;
705 release_allocation();
707 $Job->{'output'} = &collate_output();
708 $Job->{'success'} = $Job->{'output'} && $success;
709 $Job->save if $job_has_uuid;
711 if ($Job->{'output'})
714 my $manifest_text = capturex("whget", $Job->{'output'});
715 $arv->{'collections'}->{'create'}->execute('collection' => {
716 'uuid' => $Job->{'output'},
717 'manifest_text' => $manifest_text,
721 Log (undef, "Failed to register output manifest: $@");
725 Log (undef, "finish");
732 sub update_progress_stats
734 $progress_stats_updated = time;
735 return if !$progress_is_dirty;
736 my ($todo, $done, $running) = (scalar @jobstep_todo,
737 scalar @jobstep_done,
738 scalar @slot - scalar @freeslot - scalar @holdslot);
739 $Job->{'tasks_summary'} ||= {};
740 $Job->{'tasks_summary'}->{'todo'} = $todo;
741 $Job->{'tasks_summary'}->{'done'} = $done;
742 $Job->{'tasks_summary'}->{'running'} = $running;
743 $Job->save if $job_has_uuid;
744 Log (undef, "status: $done done, $running running, $todo todo");
745 $progress_is_dirty = 0;
752 my $pid = waitpid (-1, WNOHANG);
753 return 0 if $pid <= 0;
755 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
757 . $slot[$proc{$pid}->{slot}]->{cpu});
758 my $jobstepid = $proc{$pid}->{jobstep};
759 my $elapsed = time - $proc{$pid}->{time};
760 my $Jobstep = $jobstep[$jobstepid];
763 my $exitinfo = "exit $exitcode";
764 $Jobstep->{'arvados_task'}->reload;
765 my $success = $Jobstep->{'arvados_task'}->{success};
767 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
769 if (!defined $success) {
770 # task did not indicate one way or the other --> fail
771 $Jobstep->{'arvados_task'}->{success} = 0;
772 $Jobstep->{'arvados_task'}->save;
778 my $no_incr_attempts;
779 $no_incr_attempts = 1 if $Jobstep->{node_fail};
782 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
784 # Check for signs of a failed or misconfigured node
785 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
786 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
787 # Don't count this against jobstep failure thresholds if this
788 # node is already suspected faulty and srun exited quickly
789 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
791 $Jobstep->{attempts} > 1) {
792 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
793 $no_incr_attempts = 1;
794 --$Jobstep->{attempts};
796 ban_node_by_slot($proc{$pid}->{slot});
799 push @jobstep_todo, $jobstepid;
800 Log ($jobstepid, "failure in $elapsed seconds");
802 --$Jobstep->{attempts} if $no_incr_attempts;
803 $Job->{'tasks_summary'}->{'failed'}++;
807 ++$thisround_succeeded;
808 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
809 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
810 push @jobstep_done, $jobstepid;
811 Log ($jobstepid, "success in $elapsed seconds");
813 $Jobstep->{exitcode} = $exitcode;
814 $Jobstep->{finishtime} = time;
815 process_stderr ($jobstepid, $success);
816 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
818 close $reader{$jobstepid};
819 delete $reader{$jobstepid};
820 delete $slot[$proc{$pid}->{slot}]->{pid};
821 push @freeslot, $proc{$pid}->{slot};
825 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
827 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
829 'order' => 'qsequence'
831 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
833 'level' => $arvados_task->{'sequence'},
835 'arvados_task' => $arvados_task
837 push @jobstep, $jobstep;
838 push @jobstep_todo, $#jobstep;
841 $progress_is_dirty = 1;
848 # return if the kill list was checked <4 seconds ago
849 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
853 $squeue_kill_checked = time;
855 # use killem() on procs whose killtime is reached
858 if (exists $proc{$_}->{killtime}
859 && $proc{$_}->{killtime} <= time)
865 # return if the squeue was checked <60 seconds ago
866 if (defined $squeue_checked && $squeue_checked > time - 60)
870 $squeue_checked = time;
874 # here is an opportunity to check for mysterious problems with local procs
878 # get a list of steps still running
879 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
881 if ($squeue[-1] ne "ok")
887 # which of my jobsteps are running, according to squeue?
891 if (/^(\d+)\.(\d+) (\S+)/)
893 if ($1 eq $ENV{SLURM_JOBID})
900 # which of my active child procs (>60s old) were not mentioned by squeue?
903 if ($proc{$_}->{time} < time - 60
904 && !exists $ok{$proc{$_}->{jobstepname}}
905 && !exists $proc{$_}->{killtime})
907 # kill this proc if it hasn't exited in 30 seconds
908 $proc{$_}->{killtime} = time + 30;
914 sub release_allocation
918 Log (undef, "release job allocation");
919 system "scancel $ENV{SLURM_JOBID}";
927 foreach my $job (keys %reader)
930 while (0 < sysread ($reader{$job}, $buf, 8192))
932 print STDERR $buf if $ENV{CRUNCH_DEBUG};
933 $jobstep[$job]->{stderr} .= $buf;
934 preprocess_stderr ($job);
935 if (length ($jobstep[$job]->{stderr}) > 16384)
937 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
946 sub preprocess_stderr
950 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
952 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
953 Log ($job, "stderr $line");
954 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
956 $main::please_freeze = 1;
958 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
959 $jobstep[$job]->{node_fail} = 1;
960 ban_node_by_slot($jobstep[$job]->{slotindex});
970 preprocess_stderr ($job);
973 Log ($job, "stderr $_");
974 } split ("\n", $jobstep[$job]->{stderr});
980 my $whc = Warehouse->new;
981 Log (undef, "collate");
982 $whc->write_start (1);
986 next if (!exists $_->{'arvados_task'}->{output} ||
987 !$_->{'arvados_task'}->{'success'} ||
988 $_->{'exitcode'} != 0);
989 my $output = $_->{'arvados_task'}->{output};
990 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
992 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
993 $whc->write_data ($output);
995 elsif (@jobstep == 1)
997 $joboutput = $output;
1000 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1002 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1003 $whc->write_data ($outblock);
1007 my $errstr = $whc->errstr;
1008 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1012 $joboutput = $whc->write_finish if !defined $joboutput;
1015 Log (undef, "output $joboutput");
1016 $Job->{'output'} = $joboutput;
1017 $Job->save if $job_has_uuid;
1021 Log (undef, "output undef");
1031 my $sig = 2; # SIGINT first
1032 if (exists $proc{$_}->{"sent_$sig"} &&
1033 time - $proc{$_}->{"sent_$sig"} > 4)
1035 $sig = 15; # SIGTERM if SIGINT doesn't work
1037 if (exists $proc{$_}->{"sent_$sig"} &&
1038 time - $proc{$_}->{"sent_$sig"} > 4)
1040 $sig = 9; # SIGKILL if SIGTERM doesn't work
1042 if (!exists $proc{$_}->{"sent_$sig"})
1044 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1046 select (undef, undef, undef, 0.1);
1049 kill $sig, $_; # srun wants two SIGINT to really interrupt
1051 $proc{$_}->{"sent_$sig"} = time;
1052 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1062 vec($bits,fileno($_),1) = 1;
1068 sub Log # ($jobstep_id, $logmessage)
1070 if ($_[1] =~ /\n/) {
1071 for my $line (split (/\n/, $_[1])) {
1076 my $fh = select STDERR; $|=1; select $fh;
1077 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1078 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1081 if ($metastream || -t STDERR) {
1082 my @gmtime = gmtime;
1083 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1084 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1086 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1088 return if !$metastream;
1089 $metastream->write_data ($datetime . " " . $message);
1095 my ($package, $file, $line) = caller;
1096 my $message = "@_ at $file line $line\n";
1097 Log (undef, $message);
1098 freeze() if @jobstep_todo;
1099 collate_output() if @jobstep_todo;
1101 save_meta() if $metastream;
1108 return if !$job_has_uuid;
1109 $Job->reload if $job_has_uuid;
1110 $Job->{'running'} = 0;
1111 $Job->{'success'} = 0;
1112 $Job->{'finished_at'} = gmtime;
1113 $Job->save if $job_has_uuid;
1119 my $justcheckpoint = shift; # false if this will be the last meta saved
1120 my $m = $metastream;
1121 $m = $m->copy if $justcheckpoint;
1123 my $loglocator = $m->as_key;
1124 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1125 Log (undef, "meta key is $loglocator");
1126 $Job->{'log'} = $loglocator;
1127 $Job->save if $job_has_uuid;
1131 sub freeze_if_want_freeze
1133 if ($main::please_freeze)
1135 release_allocation();
1138 # kill some srun procs before freeze+stop
1139 map { $proc{$_} = {} } @_;
1142 killem (keys %proc);
1143 select (undef, undef, undef, 0.1);
1145 while (($died = waitpid (-1, WNOHANG)) > 0)
1147 delete $proc{$died};
1162 Log (undef, "Freeze not implemented");
1169 croak ("Thaw not implemented");
1173 Log (undef, "thaw from $key");
1178 @jobstep_tomerge = ();
1179 $jobstep_tomerge_level = 0;
1182 my $stream = new Warehouse::Stream ( whc => $whc,
1183 hash => [split (",", $key)] );
1185 while (my $dataref = $stream->read_until (undef, "\n\n"))
1187 if ($$dataref =~ /^job /)
1189 foreach (split ("\n", $$dataref))
1191 my ($k, $v) = split ("=", $_, 2);
1192 $frozenjob->{$k} = freezeunquote ($v);
1197 if ($$dataref =~ /^merge (\d+) (.*)/)
1199 $jobstep_tomerge_level = $1;
1201 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1206 foreach (split ("\n", $$dataref))
1208 my ($k, $v) = split ("=", $_, 2);
1209 $Jobstep->{$k} = freezeunquote ($v) if $k;
1211 $Jobstep->{attempts} = 0;
1212 push @jobstep, $Jobstep;
1214 if ($Jobstep->{exitcode} eq "0")
1216 push @jobstep_done, $#jobstep;
1220 push @jobstep_todo, $#jobstep;
1224 foreach (qw (script script_version script_parameters))
1226 $Job->{$_} = $frozenjob->{$_};
1228 $Job->save if $job_has_uuid;
1244 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1251 my $srunargs = shift;
1252 my $execargs = shift;
1253 my $opts = shift || {};
1255 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1256 print STDERR (join (" ",
1257 map { / / ? "'$_'" : $_ }
1260 if $ENV{CRUNCH_DEBUG};
1262 if (defined $stdin) {
1263 my $child = open STDIN, "-|";
1264 defined $child or die "no fork: $!";
1266 print $stdin or die $!;
1267 close STDOUT or die $!;
1272 return system (@$args) if $opts->{fork};
1275 warn "ENV size is ".length(join(" ",%ENV));
1276 die "exec failed: $!: @$args";
1280 sub ban_node_by_slot {
1281 # Don't start any new jobsteps on this node for 60 seconds
1283 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1284 $slot[$slotid]->{node}->{hold_count}++;
1285 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1291 # checkout-and-build
1295 my $destdir = $ENV{"CRUNCH_SRC"};
1296 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1297 my $repo = $ENV{"CRUNCH_SRC_URL"};
1299 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1301 if (readlink ("$destdir.commit") eq $commit) {
1305 unlink "$destdir.commit";
1306 open STDOUT, ">", "$destdir.log";
1307 open STDERR, ">&STDOUT";
1310 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1313 die "'tar -C $destdir -xf -' exited $?: $!";
1317 chomp ($pwd = `pwd`);
1318 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1320 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1322 shell_or_die ("./tests/autotests.sh", $install_dir);
1323 } elsif (-e "./install.sh") {
1324 shell_or_die ("./install.sh", $install_dir);
1328 unlink "$destdir.commit.new";
1329 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1330 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1339 if ($ENV{"DEBUG"}) {
1340 print STDERR "@_\n";
1343 or die "@_ failed: $! exit 0x".sprintf("%x",$?);