2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 $main::ENV{CRUNCH_DEBUG} = 1;
112 $main::ENV{CRUNCH_DEBUG} = 0;
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->write_start('log.txt');
129 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130 $User = $arv->{'users'}->{'current'}->execute;
131 if (!$force_unlock) {
132 if ($Job->{'is_locked_by'}) {
133 croak("Job is locked: " . $Job->{'is_locked_by'});
135 if ($Job->{'success'} ne undef) {
136 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138 if ($Job->{'running'}) {
139 croak("Job 'running' flag is already set");
141 if ($Job->{'started_at'}) {
142 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148 $Job = JSON::decode_json($jobspec);
152 map { croak ("No $_ specified") unless $Job->{$_} }
153 qw(script script_version script_parameters);
156 if (!defined $Job->{'uuid'}) {
157 my $hostname = `hostname -s`;
159 $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
162 $job_id = $Job->{'uuid'};
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
171 Log (undef, "check slurm allocation");
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
178 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179 push @sinfo, "$localcpus localhost";
181 if (exists $ENV{SLURM_NODELIST})
183 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
187 my ($ncpus, $slurm_nodelist) = split;
188 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
191 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
194 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
197 foreach (split (",", $ranges))
210 push @nodelist, map {
212 $n =~ s/\[[-,\d]+\]/$_/;
219 push @nodelist, $nodelist;
222 foreach my $nodename (@nodelist)
224 Log (undef, "node $nodename - $ncpus slots");
225 my $node = { name => $nodename,
229 foreach my $cpu (1..$ncpus)
231 push @slot, { node => $node,
235 push @node, @nodelist;
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250 # Claim this job, and make sure nobody else does
252 $Job->{'is_locked_by'} = $User->{'uuid'};
253 $Job->{'started_at'} = gmtime;
254 $Job->{'running'} = 1;
255 $Job->{'success'} = undef;
256 $Job->{'tasks_summary'} = { 'failed' => 0,
260 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
261 croak("Error while updating / locking job");
266 Log (undef, "start");
267 $SIG{'INT'} = sub { $main::please_freeze = 1; };
268 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
269 $SIG{'TERM'} = \&croak;
270 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
271 $SIG{'ALRM'} = sub { $main::please_info = 1; };
272 $SIG{'CONT'} = sub { $main::please_continue = 1; };
273 $main::please_freeze = 0;
274 $main::please_info = 0;
275 $main::please_continue = 0;
276 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
278 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
279 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
280 $ENV{"JOB_UUID"} = $job_id;
284 my @jobstep_todo = ();
285 my @jobstep_done = ();
286 my @jobstep_tomerge = ();
287 my $jobstep_tomerge_level = 0;
289 my $squeue_kill_checked;
290 my $output_in_keep = 0;
294 if (defined $Job->{thawedfromkey})
296 thaw ($Job->{thawedfromkey});
300 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
301 'job_uuid' => $Job->{'uuid'},
306 push @jobstep, { 'level' => 0,
308 'arvados_task' => $first_task,
310 push @jobstep_todo, 0;
317 $build_script = <DATA>;
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
323 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
326 $ENV{"CRUNCH_SRC"} = $Job->{revision};
330 Log (undef, "Install revision ".$Job->{revision});
331 my $nodelist = join(",", @node);
333 # Clean out crunch_tmp/work and crunch_tmp/opt
335 my $cleanpid = fork();
338 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
339 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
344 last if $cleanpid == waitpid (-1, WNOHANG);
345 freeze_if_want_freeze ($cleanpid);
346 select (undef, undef, undef, 0.1);
348 Log (undef, "Clean-work-dir exited $?");
350 # Install requested code version
353 my @srunargs = ("srun",
354 "--nodelist=$nodelist",
355 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
357 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
358 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
359 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
363 my $treeish = $Job->{'script_version'};
364 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
365 # Todo: let script_version specify repository instead of expecting
366 # parent process to figure it out.
367 $ENV{"CRUNCH_SRC_URL"} = $repo;
369 # Create/update our clone of the remote git repo
371 if (!-d $ENV{"CRUNCH_SRC"}) {
372 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
373 or croak ("git clone $repo failed: exit ".($?>>8));
374 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
376 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
378 # If this looks like a subversion r#, look for it in git-svn commit messages
380 if ($treeish =~ m{^\d{1,4}$}) {
381 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
383 if ($gitlog =~ /^[a-f0-9]{40}$/) {
385 Log (undef, "Using commit $commit for revision $treeish");
389 # If that didn't work, try asking git to look it up as a tree-ish.
391 if (!defined $commit) {
393 my $cooked_treeish = $treeish;
394 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
395 # Looks like a git branch name -- make sure git knows it's
396 # relative to the remote repo
397 $cooked_treeish = "origin/$treeish";
400 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
402 if ($found =~ /^[0-9a-f]{40}$/s) {
404 if ($commit ne $treeish) {
405 # Make sure we record the real commit id in the database,
406 # frozentokey, logs, etc. -- instead of an abbreviation or a
407 # branch name which can become ambiguous or point to a
408 # different commit in the future.
409 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
410 Log (undef, "Using commit $commit for tree-ish $treeish");
411 if ($commit ne $treeish) {
412 $Job->{'script_version'} = $commit;
413 $Job->save() or croak("Error while updating job");
419 if (defined $commit) {
420 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421 @execargs = ("sh", "-c",
422 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
423 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
426 croak ("could not figure out commit id for $treeish");
429 my $installpid = fork();
430 if ($installpid == 0)
432 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
437 last if $installpid == waitpid (-1, WNOHANG);
438 freeze_if_want_freeze ($installpid);
439 select (undef, undef, undef, 0.1);
441 Log (undef, "Install exited $?");
446 foreach (qw (script script_version script_parameters resource_limits))
450 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
452 foreach (split (/\n/, $Job->{knobs}))
454 Log (undef, "knob " . $_);
465 my $thisround_succeeded = 0;
466 my $thisround_failed = 0;
467 my $thisround_failed_multiple = 0;
469 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
470 or $a <=> $b } @jobstep_todo;
471 my $level = $jobstep[$jobstep_todo[0]]->{level};
472 Log (undef, "start level $level");
477 my @freeslot = (0..$#slot);
480 my $progress_is_dirty = 1;
481 my $progress_stats_updated = 0;
483 update_progress_stats();
488 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
490 $main::please_continue = 0;
492 my $id = $jobstep_todo[$todo_ptr];
493 my $Jobstep = $jobstep[$id];
494 if ($Jobstep->{level} != $level)
498 if ($Jobstep->{attempts} > 9)
500 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
505 pipe $reader{$id}, "writer" or croak ($!);
506 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
507 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
509 my $childslot = $freeslot[0];
510 my $childnode = $slot[$childslot]->{node};
511 my $childslotname = join (".",
512 $slot[$childslot]->{node}->{name},
513 $slot[$childslot]->{cpu});
514 my $childpid = fork();
517 $SIG{'INT'} = 'DEFAULT';
518 $SIG{'QUIT'} = 'DEFAULT';
519 $SIG{'TERM'} = 'DEFAULT';
521 foreach (values (%reader))
525 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
526 open(STDOUT,">&writer");
527 open(STDERR,">&writer");
532 delete $ENV{"GNUPGHOME"};
533 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
534 $ENV{"TASK_QSEQUENCE"} = $id;
535 $ENV{"TASK_SEQUENCE"} = $level;
536 $ENV{"JOB_SCRIPT"} = $Job->{script};
537 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
538 $param =~ tr/a-z/A-Z/;
539 $ENV{"JOB_PARAMETER_$param"} = $value;
541 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
542 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
543 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
544 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
550 "--nodelist=".$childnode->{name},
551 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
552 "--job-name=$job_id.$id.$$",
554 my @execargs = qw(sh);
555 my $build_script_to_send = "";
557 "mkdir -p $ENV{CRUNCH_TMP}/revision "
558 ."&& cd $ENV{CRUNCH_TMP} ";
561 $build_script_to_send = $build_script;
565 elsif (!$skip_install)
570 ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
572 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
573 ." && ./installrevision "
577 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
579 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580 my @execargs = ('bash', '-c', $command);
581 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
585 if (!defined $childpid)
592 $proc{$childpid} = { jobstep => $id,
595 jobstepname => "$job_id.$id.$childpid",
597 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598 $slot[$childslot]->{pid} = $childpid;
600 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601 Log ($id, "child $childpid started on $childslotname");
602 $Jobstep->{attempts} ++;
603 $Jobstep->{starttime} = time;
604 $Jobstep->{node} = $childnode->{name};
605 $Jobstep->{slotindex} = $childslot;
606 delete $Jobstep->{stderr};
607 delete $Jobstep->{finishtime};
609 splice @jobstep_todo, $todo_ptr, 1;
612 $progress_is_dirty = 1;
616 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
618 last THISROUND if $main::please_freeze;
619 if ($main::please_info)
621 $main::please_info = 0;
625 update_progress_stats();
633 update_progress_stats();
634 select (undef, undef, undef, 0.1);
636 elsif (time - $progress_stats_updated >= 30)
638 update_progress_stats();
640 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
643 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644 .($thisround_failed+$thisround_succeeded)
645 .") -- giving up on this round";
646 Log (undef, $message);
650 # move slots from freeslot to holdslot (or back to freeslot) if necessary
651 for (my $i=$#freeslot; $i>=0; $i--) {
652 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653 push @holdslot, (splice @freeslot, $i, 1);
656 for (my $i=$#holdslot; $i>=0; $i--) {
657 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658 push @freeslot, (splice @holdslot, $i, 1);
662 # give up if no nodes are succeeding
663 if (!grep { $_->{node}->{losing_streak} == 0 &&
664 $_->{node}->{hold_count} < 4 } @slot) {
665 my $message = "Every node has failed -- giving up on this round";
666 Log (undef, $message);
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 goto THISROUND if $main::please_continue;
681 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
686 update_progress_stats();
687 select (undef, undef, undef, 0.1);
688 killem (keys %proc) if $main::please_freeze;
692 update_progress_stats();
693 freeze_if_want_freeze();
696 if (!defined $success)
699 $thisround_succeeded == 0 &&
700 ($thisround_failed == 0 || $thisround_failed > 4))
702 my $message = "stop because $thisround_failed tasks failed and none succeeded";
703 Log (undef, $message);
712 goto ONELEVEL if !defined $success;
715 release_allocation();
717 $Job->{'output'} = &collate_output();
718 $Job->{'success'} = $Job->{'output'} && $success;
721 if ($Job->{'output'})
724 my $manifest_text = capturex("whget", $Job->{'output'});
725 $arv->{'collections'}->{'create'}->execute('collection' => {
726 'uuid' => $Job->{'output'},
727 'manifest_text' => $manifest_text,
731 Log (undef, "Failed to register output manifest: $@");
735 Log (undef, "finish");
742 sub update_progress_stats
744 $progress_stats_updated = time;
745 return if !$progress_is_dirty;
746 my ($todo, $done, $running) = (scalar @jobstep_todo,
747 scalar @jobstep_done,
748 scalar @slot - scalar @freeslot - scalar @holdslot);
749 $Job->{'tasks_summary'} ||= {};
750 $Job->{'tasks_summary'}->{'todo'} = $todo;
751 $Job->{'tasks_summary'}->{'done'} = $done;
752 $Job->{'tasks_summary'}->{'running'} = $running;
754 Log (undef, "status: $done done, $running running, $todo todo");
755 $progress_is_dirty = 0;
762 my $pid = waitpid (-1, WNOHANG);
763 return 0 if $pid <= 0;
765 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
767 . $slot[$proc{$pid}->{slot}]->{cpu});
768 my $jobstepid = $proc{$pid}->{jobstep};
769 my $elapsed = time - $proc{$pid}->{time};
770 my $Jobstep = $jobstep[$jobstepid];
773 my $exitinfo = "exit $exitcode";
774 $Jobstep->{'arvados_task'}->reload;
775 my $success = $Jobstep->{'arvados_task'}->{success};
777 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
779 if (!defined $success) {
780 # task did not indicate one way or the other --> fail
781 $Jobstep->{'arvados_task'}->{success} = 0;
782 $Jobstep->{'arvados_task'}->save;
788 my $no_incr_attempts;
789 $no_incr_attempts = 1 if $Jobstep->{node_fail};
792 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
794 # Check for signs of a failed or misconfigured node
795 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
796 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
797 # Don't count this against jobstep failure thresholds if this
798 # node is already suspected faulty and srun exited quickly
799 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
801 $Jobstep->{attempts} > 1) {
802 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
803 $no_incr_attempts = 1;
804 --$Jobstep->{attempts};
806 ban_node_by_slot($proc{$pid}->{slot});
809 push @jobstep_todo, $jobstepid;
810 Log ($jobstepid, "failure in $elapsed seconds");
812 --$Jobstep->{attempts} if $no_incr_attempts;
813 $Job->{'tasks_summary'}->{'failed'}++;
817 ++$thisround_succeeded;
818 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
819 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
820 push @jobstep_done, $jobstepid;
821 Log ($jobstepid, "success in $elapsed seconds");
823 $Jobstep->{exitcode} = $exitcode;
824 $Jobstep->{finishtime} = time;
825 process_stderr ($jobstepid, $success);
826 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
828 close $reader{$jobstepid};
829 delete $reader{$jobstepid};
830 delete $slot[$proc{$pid}->{slot}]->{pid};
831 push @freeslot, $proc{$pid}->{slot};
835 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
837 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
839 'order' => 'qsequence'
841 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
843 'level' => $arvados_task->{'sequence'},
845 'arvados_task' => $arvados_task
847 push @jobstep, $jobstep;
848 push @jobstep_todo, $#jobstep;
851 $progress_is_dirty = 1;
858 # return if the kill list was checked <4 seconds ago
859 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
863 $squeue_kill_checked = time;
865 # use killem() on procs whose killtime is reached
868 if (exists $proc{$_}->{killtime}
869 && $proc{$_}->{killtime} <= time)
875 # return if the squeue was checked <60 seconds ago
876 if (defined $squeue_checked && $squeue_checked > time - 60)
880 $squeue_checked = time;
884 # here is an opportunity to check for mysterious problems with local procs
888 # get a list of steps still running
889 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
891 if ($squeue[-1] ne "ok")
897 # which of my jobsteps are running, according to squeue?
901 if (/^(\d+)\.(\d+) (\S+)/)
903 if ($1 eq $ENV{SLURM_JOBID})
910 # which of my active child procs (>60s old) were not mentioned by squeue?
913 if ($proc{$_}->{time} < time - 60
914 && !exists $ok{$proc{$_}->{jobstepname}}
915 && !exists $proc{$_}->{killtime})
917 # kill this proc if it hasn't exited in 30 seconds
918 $proc{$_}->{killtime} = time + 30;
924 sub release_allocation
928 Log (undef, "release job allocation");
929 system "scancel $ENV{SLURM_JOBID}";
937 foreach my $job (keys %reader)
940 while (0 < sysread ($reader{$job}, $buf, 8192))
942 print STDERR $buf if $ENV{CRUNCH_DEBUG};
943 $jobstep[$job]->{stderr} .= $buf;
944 preprocess_stderr ($job);
945 if (length ($jobstep[$job]->{stderr}) > 16384)
947 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
956 sub preprocess_stderr
960 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
962 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
963 Log ($job, "stderr $line");
964 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
966 $main::please_freeze = 1;
968 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
969 $jobstep[$job]->{node_fail} = 1;
970 ban_node_by_slot($jobstep[$job]->{slotindex});
980 preprocess_stderr ($job);
983 Log ($job, "stderr $_");
984 } split ("\n", $jobstep[$job]->{stderr});
990 my $whc = Warehouse->new;
991 Log (undef, "collate");
992 $whc->write_start (1);
996 next if (!exists $_->{'arvados_task'}->{output} ||
997 !$_->{'arvados_task'}->{'success'} ||
998 $_->{'exitcode'} != 0);
999 my $output = $_->{'arvados_task'}->{output};
1000 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1002 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1003 $whc->write_data ($output);
1005 elsif (@jobstep == 1)
1007 $joboutput = $output;
1010 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1012 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1013 $whc->write_data ($outblock);
1017 my $errstr = $whc->errstr;
1018 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1022 $joboutput = $whc->write_finish if !defined $joboutput;
1025 Log (undef, "output $joboutput");
1026 $Job->{'output'} = $joboutput;
1031 Log (undef, "output undef");
1041 my $sig = 2; # SIGINT first
1042 if (exists $proc{$_}->{"sent_$sig"} &&
1043 time - $proc{$_}->{"sent_$sig"} > 4)
1045 $sig = 15; # SIGTERM if SIGINT doesn't work
1047 if (exists $proc{$_}->{"sent_$sig"} &&
1048 time - $proc{$_}->{"sent_$sig"} > 4)
1050 $sig = 9; # SIGKILL if SIGTERM doesn't work
1052 if (!exists $proc{$_}->{"sent_$sig"})
1054 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1056 select (undef, undef, undef, 0.1);
1059 kill $sig, $_; # srun wants two SIGINT to really interrupt
1061 $proc{$_}->{"sent_$sig"} = time;
1062 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1072 vec($bits,fileno($_),1) = 1;
1078 sub Log # ($jobstep_id, $logmessage)
1080 if ($_[1] =~ /\n/) {
1081 for my $line (split (/\n/, $_[1])) {
1086 my $fh = select STDERR; $|=1; select $fh;
1087 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1088 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1091 if ($metastream || -t STDERR) {
1092 my @gmtime = gmtime;
1093 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1094 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1096 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1098 return if !$metastream;
1099 $metastream->write_data ($datetime . " " . $message);
1105 my ($package, $file, $line) = caller;
1106 my $message = "@_ at $file line $line\n";
1107 Log (undef, $message);
1108 freeze() if @jobstep_todo;
1109 collate_output() if @jobstep_todo;
1111 save_meta() if $metastream;
1118 return if !$job_has_uuid;
1120 $Job->{'running'} = 0;
1121 $Job->{'success'} = 0;
1122 $Job->{'finished_at'} = gmtime;
1129 my $justcheckpoint = shift; # false if this will be the last meta saved
1130 my $m = $metastream;
1131 $m = $m->copy if $justcheckpoint;
1133 my $loglocator = $m->as_key;
1134 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1135 Log (undef, "meta key is $loglocator");
1136 $Job->{'log'} = $loglocator;
1141 sub freeze_if_want_freeze
1143 if ($main::please_freeze)
1145 release_allocation();
1148 # kill some srun procs before freeze+stop
1149 map { $proc{$_} = {} } @_;
1152 killem (keys %proc);
1153 select (undef, undef, undef, 0.1);
1155 while (($died = waitpid (-1, WNOHANG)) > 0)
1157 delete $proc{$died};
1172 Log (undef, "Freeze not implemented");
1179 croak ("Thaw not implemented");
1183 Log (undef, "thaw from $key");
1188 @jobstep_tomerge = ();
1189 $jobstep_tomerge_level = 0;
1192 my $stream = new Warehouse::Stream ( whc => $whc,
1193 hash => [split (",", $key)] );
1195 while (my $dataref = $stream->read_until (undef, "\n\n"))
1197 if ($$dataref =~ /^job /)
1199 foreach (split ("\n", $$dataref))
1201 my ($k, $v) = split ("=", $_, 2);
1202 $frozenjob->{$k} = freezeunquote ($v);
1207 if ($$dataref =~ /^merge (\d+) (.*)/)
1209 $jobstep_tomerge_level = $1;
1211 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1216 foreach (split ("\n", $$dataref))
1218 my ($k, $v) = split ("=", $_, 2);
1219 $Jobstep->{$k} = freezeunquote ($v) if $k;
1221 $Jobstep->{attempts} = 0;
1222 push @jobstep, $Jobstep;
1224 if ($Jobstep->{exitcode} eq "0")
1226 push @jobstep_done, $#jobstep;
1230 push @jobstep_todo, $#jobstep;
1234 foreach (qw (script script_version script_parameters))
1236 $Job->{$_} = $frozenjob->{$_};
1254 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1261 my $srunargs = shift;
1262 my $execargs = shift;
1263 my $opts = shift || {};
1265 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1266 print STDERR (join (" ",
1267 map { / / ? "'$_'" : $_ }
1270 if $ENV{CRUNCH_DEBUG};
1272 if (defined $stdin) {
1273 my $child = open STDIN, "-|";
1274 defined $child or die "no fork: $!";
1276 print $stdin or die $!;
1277 close STDOUT or die $!;
1282 return system (@$args) if $opts->{fork};
1285 warn "ENV size is ".length(join(" ",%ENV));
1286 die "exec failed: $!: @$args";
1290 sub ban_node_by_slot {
1291 # Don't start any new jobsteps on this node for 60 seconds
1293 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1294 $slot[$slotid]->{node}->{hold_count}++;
1295 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1301 # checkout-and-build
1305 my $destdir = $ENV{"CRUNCH_SRC"};
1306 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1307 my $repo = $ENV{"CRUNCH_SRC_URL"};
1309 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1311 if (readlink ("$destdir.commit") eq $commit) {
1315 unlink "$destdir.commit";
1316 open STDOUT, ">", "$destdir.log";
1317 open STDERR, ">&STDOUT";
1320 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1323 die "'tar -C $destdir -xf -' exited $?: $!";
1327 chomp ($pwd = `pwd`);
1328 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1330 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1332 shell_or_die ("./tests/autotests.sh", $install_dir);
1333 } elsif (-e "./install.sh") {
1334 shell_or_die ("./install.sh", $install_dir);
1338 unlink "$destdir.commit.new";
1339 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1340 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1349 if ($ENV{"DEBUG"}) {
1350 print STDERR "@_\n";
1353 or die "@_ failed: $! exit 0x".sprintf("%x",$?);