2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100 my $local_job = !$job_has_uuid;
109 $main::ENV{CRUNCH_DEBUG} = 1;
113 $main::ENV{CRUNCH_DEBUG} = 0;
118 my $arv = Arvados->new;
119 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
121 $metastream->write_start('log.txt');
123 my $User = $arv->{'users'}->{'current'}->execute;
131 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
132 if (!$force_unlock) {
133 if ($Job->{'is_locked_by'}) {
134 croak("Job is locked: " . $Job->{'is_locked_by'});
136 if ($Job->{'success'} ne undef) {
137 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
139 if ($Job->{'running'}) {
140 croak("Job 'running' flag is already set");
142 if ($Job->{'started_at'}) {
143 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
149 $Job = JSON::decode_json($jobspec);
153 map { croak ("No $_ specified") unless $Job->{$_} }
154 qw(script script_version script_parameters);
157 $Job->{'is_locked_by'} = $User->{'uuid'};
158 $Job->{'started_at'} = gmtime;
160 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164 $job_id = $Job->{'uuid'};
168 $Job->{'resource_limits'} ||= {};
169 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
170 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
173 Log (undef, "check slurm allocation");
176 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
180 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
181 push @sinfo, "$localcpus localhost";
183 if (exists $ENV{SLURM_NODELIST})
185 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
189 my ($ncpus, $slurm_nodelist) = split;
190 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
193 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
196 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
199 foreach (split (",", $ranges))
212 push @nodelist, map {
214 $n =~ s/\[[-,\d]+\]/$_/;
221 push @nodelist, $nodelist;
224 foreach my $nodename (@nodelist)
226 Log (undef, "node $nodename - $ncpus slots");
227 my $node = { name => $nodename,
231 foreach my $cpu (1..$ncpus)
233 push @slot, { node => $node,
237 push @node, @nodelist;
242 # Ensure that we get one jobstep running on each allocated node before
243 # we start overloading nodes with concurrent steps
245 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
252 # Claim this job, and make sure nobody else does
254 $Job->{'is_locked_by'} = $User->{'uuid'};
255 $Job->{'started_at'} = gmtime;
256 $Job->{'running'} = 1;
257 $Job->{'success'} = undef;
258 $Job->{'tasks_summary'} = { 'failed' => 0,
263 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
264 croak("Error while updating / locking job");
270 Log (undef, "start");
271 $SIG{'INT'} = sub { $main::please_freeze = 1; };
272 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
273 $SIG{'TERM'} = \&croak;
274 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
275 $SIG{'ALRM'} = sub { $main::please_info = 1; };
276 $SIG{'CONT'} = sub { $main::please_continue = 1; };
277 $main::please_freeze = 0;
278 $main::please_info = 0;
279 $main::please_continue = 0;
280 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
282 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
283 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
284 $ENV{"JOB_UUID"} = $job_id;
288 my @jobstep_todo = ();
289 my @jobstep_done = ();
290 my @jobstep_tomerge = ();
291 my $jobstep_tomerge_level = 0;
293 my $squeue_kill_checked;
294 my $output_in_keep = 0;
298 if (defined $Job->{thawedfromkey})
300 thaw ($Job->{thawedfromkey});
304 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
305 'job_uuid' => $Job->{'uuid'},
310 push @jobstep, { 'level' => 0,
312 'arvados_task' => $first_task,
314 push @jobstep_todo, 0;
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
323 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
326 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 $build_script = <DATA>;
334 Log (undef, "Install revision ".$Job->{script_version});
335 my $nodelist = join(",", @node);
337 # Clean out crunch_tmp/work and crunch_tmp/opt
339 my $cleanpid = fork();
342 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
343 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
348 last if $cleanpid == waitpid (-1, WNOHANG);
349 freeze_if_want_freeze ($cleanpid);
350 select (undef, undef, undef, 0.1);
352 Log (undef, "Clean-work-dir exited $?");
354 # Install requested code version
357 my @srunargs = ("srun",
358 "--nodelist=$nodelist",
359 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
361 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
363 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
367 my $treeish = $Job->{'script_version'};
368 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
369 # Todo: let script_version specify repository instead of expecting
370 # parent process to figure it out.
371 $ENV{"CRUNCH_SRC_URL"} = $repo;
373 # Create/update our clone of the remote git repo
375 if (!-d $ENV{"CRUNCH_SRC"}) {
376 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
377 or croak ("git clone $repo failed: exit ".($?>>8));
378 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
380 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
382 # If this looks like a subversion r#, look for it in git-svn commit messages
384 if ($treeish =~ m{^\d{1,4}$}) {
385 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
387 if ($gitlog =~ /^[a-f0-9]{40}$/) {
389 Log (undef, "Using commit $commit for script_version $treeish");
393 # If that didn't work, try asking git to look it up as a tree-ish.
395 if (!defined $commit) {
397 my $cooked_treeish = $treeish;
398 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
399 # Looks like a git branch name -- make sure git knows it's
400 # relative to the remote repo
401 $cooked_treeish = "origin/$treeish";
404 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
406 if ($found =~ /^[0-9a-f]{40}$/s) {
408 if ($commit ne $treeish) {
409 # Make sure we record the real commit id in the database,
410 # frozentokey, logs, etc. -- instead of an abbreviation or a
411 # branch name which can become ambiguous or point to a
412 # different commit in the future.
413 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
414 Log (undef, "Using commit $commit for tree-ish $treeish");
415 if ($commit ne $treeish) {
416 $Job->{'script_version'} = $commit;
417 !$job_has_uuid or $Job->save() or croak("Error while updating job");
423 if (defined $commit) {
424 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
425 @execargs = ("sh", "-c",
426 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
427 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
430 croak ("could not figure out commit id for $treeish");
433 my $installpid = fork();
434 if ($installpid == 0)
436 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
441 last if $installpid == waitpid (-1, WNOHANG);
442 freeze_if_want_freeze ($installpid);
443 select (undef, undef, undef, 0.1);
445 Log (undef, "Install exited $?");
450 foreach (qw (script script_version script_parameters resource_limits))
454 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
456 foreach (split (/\n/, $Job->{knobs}))
458 Log (undef, "knob " . $_);
469 my $thisround_succeeded = 0;
470 my $thisround_failed = 0;
471 my $thisround_failed_multiple = 0;
473 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
474 or $a <=> $b } @jobstep_todo;
475 my $level = $jobstep[$jobstep_todo[0]]->{level};
476 Log (undef, "start level $level");
481 my @freeslot = (0..$#slot);
484 my $progress_is_dirty = 1;
485 my $progress_stats_updated = 0;
487 update_progress_stats();
492 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
494 $main::please_continue = 0;
496 my $id = $jobstep_todo[$todo_ptr];
497 my $Jobstep = $jobstep[$id];
498 if ($Jobstep->{level} != $level)
502 if ($Jobstep->{attempts} > 9)
504 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
509 pipe $reader{$id}, "writer" or croak ($!);
510 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
511 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
513 my $childslot = $freeslot[0];
514 my $childnode = $slot[$childslot]->{node};
515 my $childslotname = join (".",
516 $slot[$childslot]->{node}->{name},
517 $slot[$childslot]->{cpu});
518 my $childpid = fork();
521 $SIG{'INT'} = 'DEFAULT';
522 $SIG{'QUIT'} = 'DEFAULT';
523 $SIG{'TERM'} = 'DEFAULT';
525 foreach (values (%reader))
529 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
530 open(STDOUT,">&writer");
531 open(STDERR,">&writer");
536 delete $ENV{"GNUPGHOME"};
537 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
538 $ENV{"TASK_QSEQUENCE"} = $id;
539 $ENV{"TASK_SEQUENCE"} = $level;
540 $ENV{"JOB_SCRIPT"} = $Job->{script};
541 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
542 $param =~ tr/a-z/A-Z/;
543 $ENV{"JOB_PARAMETER_$param"} = $value;
545 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
546 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
547 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
548 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
554 "--nodelist=".$childnode->{name},
555 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556 "--job-name=$job_id.$id.$$",
558 my @execargs = qw(sh);
559 my $build_script_to_send = "";
561 "mkdir -p $ENV{CRUNCH_WORK} $ENV{CRUNCH_TMP} "
562 ."&& cd $ENV{CRUNCH_TMP} ";
565 $build_script_to_send = $build_script;
569 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
571 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
572 my @execargs = ('bash', '-c', $command);
573 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
577 if (!defined $childpid)
584 $proc{$childpid} = { jobstep => $id,
587 jobstepname => "$job_id.$id.$childpid",
589 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
590 $slot[$childslot]->{pid} = $childpid;
592 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
593 Log ($id, "child $childpid started on $childslotname");
594 $Jobstep->{attempts} ++;
595 $Jobstep->{starttime} = time;
596 $Jobstep->{node} = $childnode->{name};
597 $Jobstep->{slotindex} = $childslot;
598 delete $Jobstep->{stderr};
599 delete $Jobstep->{finishtime};
601 splice @jobstep_todo, $todo_ptr, 1;
604 $progress_is_dirty = 1;
608 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
610 last THISROUND if $main::please_freeze;
611 if ($main::please_info)
613 $main::please_info = 0;
617 update_progress_stats();
625 update_progress_stats();
626 select (undef, undef, undef, 0.1);
628 elsif (time - $progress_stats_updated >= 30)
630 update_progress_stats();
632 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
633 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
635 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
636 .($thisround_failed+$thisround_succeeded)
637 .") -- giving up on this round";
638 Log (undef, $message);
642 # move slots from freeslot to holdslot (or back to freeslot) if necessary
643 for (my $i=$#freeslot; $i>=0; $i--) {
644 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
645 push @holdslot, (splice @freeslot, $i, 1);
648 for (my $i=$#holdslot; $i>=0; $i--) {
649 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
650 push @freeslot, (splice @holdslot, $i, 1);
654 # give up if no nodes are succeeding
655 if (!grep { $_->{node}->{losing_streak} == 0 &&
656 $_->{node}->{hold_count} < 4 } @slot) {
657 my $message = "Every node has failed -- giving up on this round";
658 Log (undef, $message);
665 push @freeslot, splice @holdslot;
666 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
669 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
672 goto THISROUND if $main::please_continue;
673 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
678 update_progress_stats();
679 select (undef, undef, undef, 0.1);
680 killem (keys %proc) if $main::please_freeze;
684 update_progress_stats();
685 freeze_if_want_freeze();
688 if (!defined $success)
691 $thisround_succeeded == 0 &&
692 ($thisround_failed == 0 || $thisround_failed > 4))
694 my $message = "stop because $thisround_failed tasks failed and none succeeded";
695 Log (undef, $message);
704 goto ONELEVEL if !defined $success;
707 release_allocation();
710 $Job->{'output'} = &collate_output();
711 $Job->{'running'} = 0;
712 $Job->{'success'} = $Job->{'output'} && $success;
713 $Job->{'finished_at'} = gmtime;
714 $Job->save if $job_has_uuid;
716 if ($Job->{'output'})
719 my $manifest_text = capturex("whget", $Job->{'output'});
720 $arv->{'collections'}->{'create'}->execute('collection' => {
721 'uuid' => $Job->{'output'},
722 'manifest_text' => $manifest_text,
726 Log (undef, "Failed to register output manifest: $@");
730 Log (undef, "finish");
737 sub update_progress_stats
739 $progress_stats_updated = time;
740 return if !$progress_is_dirty;
741 my ($todo, $done, $running) = (scalar @jobstep_todo,
742 scalar @jobstep_done,
743 scalar @slot - scalar @freeslot - scalar @holdslot);
744 $Job->{'tasks_summary'} ||= {};
745 $Job->{'tasks_summary'}->{'todo'} = $todo;
746 $Job->{'tasks_summary'}->{'done'} = $done;
747 $Job->{'tasks_summary'}->{'running'} = $running;
748 $Job->save if $job_has_uuid;
749 Log (undef, "status: $done done, $running running, $todo todo");
750 $progress_is_dirty = 0;
757 my $pid = waitpid (-1, WNOHANG);
758 return 0 if $pid <= 0;
760 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
762 . $slot[$proc{$pid}->{slot}]->{cpu});
763 my $jobstepid = $proc{$pid}->{jobstep};
764 my $elapsed = time - $proc{$pid}->{time};
765 my $Jobstep = $jobstep[$jobstepid];
768 my $exitinfo = "exit $exitcode";
769 $Jobstep->{'arvados_task'}->reload;
770 my $success = $Jobstep->{'arvados_task'}->{success};
772 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
774 if (!defined $success) {
775 # task did not indicate one way or the other --> fail
776 $Jobstep->{'arvados_task'}->{success} = 0;
777 $Jobstep->{'arvados_task'}->save;
783 my $no_incr_attempts;
784 $no_incr_attempts = 1 if $Jobstep->{node_fail};
787 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
789 # Check for signs of a failed or misconfigured node
790 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
791 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
792 # Don't count this against jobstep failure thresholds if this
793 # node is already suspected faulty and srun exited quickly
794 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
796 $Jobstep->{attempts} > 1) {
797 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
798 $no_incr_attempts = 1;
799 --$Jobstep->{attempts};
801 ban_node_by_slot($proc{$pid}->{slot});
804 push @jobstep_todo, $jobstepid;
805 Log ($jobstepid, "failure in $elapsed seconds");
807 --$Jobstep->{attempts} if $no_incr_attempts;
808 $Job->{'tasks_summary'}->{'failed'}++;
812 ++$thisround_succeeded;
813 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
814 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
815 push @jobstep_done, $jobstepid;
816 Log ($jobstepid, "success in $elapsed seconds");
818 $Jobstep->{exitcode} = $exitcode;
819 $Jobstep->{finishtime} = time;
820 process_stderr ($jobstepid, $success);
821 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
823 close $reader{$jobstepid};
824 delete $reader{$jobstepid};
825 delete $slot[$proc{$pid}->{slot}]->{pid};
826 push @freeslot, $proc{$pid}->{slot};
830 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
832 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
834 'order' => 'qsequence'
836 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
838 'level' => $arvados_task->{'sequence'},
840 'arvados_task' => $arvados_task
842 push @jobstep, $jobstep;
843 push @jobstep_todo, $#jobstep;
846 $progress_is_dirty = 1;
853 # return if the kill list was checked <4 seconds ago
854 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
858 $squeue_kill_checked = time;
860 # use killem() on procs whose killtime is reached
863 if (exists $proc{$_}->{killtime}
864 && $proc{$_}->{killtime} <= time)
870 # return if the squeue was checked <60 seconds ago
871 if (defined $squeue_checked && $squeue_checked > time - 60)
875 $squeue_checked = time;
879 # here is an opportunity to check for mysterious problems with local procs
883 # get a list of steps still running
884 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
886 if ($squeue[-1] ne "ok")
892 # which of my jobsteps are running, according to squeue?
896 if (/^(\d+)\.(\d+) (\S+)/)
898 if ($1 eq $ENV{SLURM_JOBID})
905 # which of my active child procs (>60s old) were not mentioned by squeue?
908 if ($proc{$_}->{time} < time - 60
909 && !exists $ok{$proc{$_}->{jobstepname}}
910 && !exists $proc{$_}->{killtime})
912 # kill this proc if it hasn't exited in 30 seconds
913 $proc{$_}->{killtime} = time + 30;
919 sub release_allocation
923 Log (undef, "release job allocation");
924 system "scancel $ENV{SLURM_JOBID}";
932 foreach my $job (keys %reader)
935 while (0 < sysread ($reader{$job}, $buf, 8192))
937 print STDERR $buf if $ENV{CRUNCH_DEBUG};
938 $jobstep[$job]->{stderr} .= $buf;
939 preprocess_stderr ($job);
940 if (length ($jobstep[$job]->{stderr}) > 16384)
942 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
951 sub preprocess_stderr
955 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
957 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
958 Log ($job, "stderr $line");
959 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
961 $main::please_freeze = 1;
963 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
964 $jobstep[$job]->{node_fail} = 1;
965 ban_node_by_slot($jobstep[$job]->{slotindex});
975 preprocess_stderr ($job);
978 Log ($job, "stderr $_");
979 } split ("\n", $jobstep[$job]->{stderr});
985 my $whc = Warehouse->new;
986 Log (undef, "collate");
987 $whc->write_start (1);
991 next if (!exists $_->{'arvados_task'}->{output} ||
992 !$_->{'arvados_task'}->{'success'} ||
993 $_->{'exitcode'} != 0);
994 my $output = $_->{'arvados_task'}->{output};
995 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
997 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
998 $whc->write_data ($output);
1000 elsif (@jobstep == 1)
1002 $joboutput = $output;
1005 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1007 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1008 $whc->write_data ($outblock);
1012 my $errstr = $whc->errstr;
1013 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1017 $joboutput = $whc->write_finish if !defined $joboutput;
1020 Log (undef, "output $joboutput");
1021 $Job->{'output'} = $joboutput;
1022 $Job->save if $job_has_uuid;
1026 Log (undef, "output undef");
1036 my $sig = 2; # SIGINT first
1037 if (exists $proc{$_}->{"sent_$sig"} &&
1038 time - $proc{$_}->{"sent_$sig"} > 4)
1040 $sig = 15; # SIGTERM if SIGINT doesn't work
1042 if (exists $proc{$_}->{"sent_$sig"} &&
1043 time - $proc{$_}->{"sent_$sig"} > 4)
1045 $sig = 9; # SIGKILL if SIGTERM doesn't work
1047 if (!exists $proc{$_}->{"sent_$sig"})
1049 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1051 select (undef, undef, undef, 0.1);
1054 kill $sig, $_; # srun wants two SIGINT to really interrupt
1056 $proc{$_}->{"sent_$sig"} = time;
1057 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1067 vec($bits,fileno($_),1) = 1;
1073 sub Log # ($jobstep_id, $logmessage)
1075 if ($_[1] =~ /\n/) {
1076 for my $line (split (/\n/, $_[1])) {
1081 my $fh = select STDERR; $|=1; select $fh;
1082 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1083 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1086 if ($metastream || -t STDERR) {
1087 my @gmtime = gmtime;
1088 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1089 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1091 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1093 return if !$metastream;
1094 $metastream->write_data ($datetime . " " . $message);
1100 my ($package, $file, $line) = caller;
1101 my $message = "@_ at $file line $line\n";
1102 Log (undef, $message);
1103 freeze() if @jobstep_todo;
1104 collate_output() if @jobstep_todo;
1106 save_meta() if $metastream;
1113 return if !$job_has_uuid;
1115 $Job->{'running'} = 0;
1116 $Job->{'success'} = 0;
1117 $Job->{'finished_at'} = gmtime;
1124 my $justcheckpoint = shift; # false if this will be the last meta saved
1125 my $m = $metastream;
1126 $m = $m->copy if $justcheckpoint;
1128 my $loglocator = $m->as_key;
1129 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1130 Log (undef, "meta key is $loglocator");
1131 $Job->{'log'} = $loglocator;
1132 $Job->save if $job_has_uuid;
1136 sub freeze_if_want_freeze
1138 if ($main::please_freeze)
1140 release_allocation();
1143 # kill some srun procs before freeze+stop
1144 map { $proc{$_} = {} } @_;
1147 killem (keys %proc);
1148 select (undef, undef, undef, 0.1);
1150 while (($died = waitpid (-1, WNOHANG)) > 0)
1152 delete $proc{$died};
1167 Log (undef, "Freeze not implemented");
1174 croak ("Thaw not implemented");
1178 Log (undef, "thaw from $key");
1183 @jobstep_tomerge = ();
1184 $jobstep_tomerge_level = 0;
1187 my $stream = new Warehouse::Stream ( whc => $whc,
1188 hash => [split (",", $key)] );
1190 while (my $dataref = $stream->read_until (undef, "\n\n"))
1192 if ($$dataref =~ /^job /)
1194 foreach (split ("\n", $$dataref))
1196 my ($k, $v) = split ("=", $_, 2);
1197 $frozenjob->{$k} = freezeunquote ($v);
1202 if ($$dataref =~ /^merge (\d+) (.*)/)
1204 $jobstep_tomerge_level = $1;
1206 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1211 foreach (split ("\n", $$dataref))
1213 my ($k, $v) = split ("=", $_, 2);
1214 $Jobstep->{$k} = freezeunquote ($v) if $k;
1216 $Jobstep->{attempts} = 0;
1217 push @jobstep, $Jobstep;
1219 if ($Jobstep->{exitcode} eq "0")
1221 push @jobstep_done, $#jobstep;
1225 push @jobstep_todo, $#jobstep;
1229 foreach (qw (script script_version script_parameters))
1231 $Job->{$_} = $frozenjob->{$_};
1233 $Job->save if $job_has_uuid;
1249 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1256 my $srunargs = shift;
1257 my $execargs = shift;
1258 my $opts = shift || {};
1260 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1261 print STDERR (join (" ",
1262 map { / / ? "'$_'" : $_ }
1265 if $ENV{CRUNCH_DEBUG};
1267 if (defined $stdin) {
1268 my $child = open STDIN, "-|";
1269 defined $child or die "no fork: $!";
1271 print $stdin or die $!;
1272 close STDOUT or die $!;
1277 return system (@$args) if $opts->{fork};
1280 warn "ENV size is ".length(join(" ",%ENV));
1281 die "exec failed: $!: @$args";
1285 sub ban_node_by_slot {
1286 # Don't start any new jobsteps on this node for 60 seconds
1288 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1289 $slot[$slotid]->{node}->{hold_count}++;
1290 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1296 # checkout-and-build
1300 my $destdir = $ENV{"CRUNCH_SRC"};
1301 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1302 my $repo = $ENV{"CRUNCH_SRC_URL"};
1304 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1306 if (readlink ("$destdir.commit") eq $commit) {
1310 unlink "$destdir.commit";
1311 open STDOUT, ">", "$destdir.log";
1312 open STDERR, ">&STDOUT";
1315 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1318 die "'tar -C $destdir -xf -' exited $?: $!";
1322 chomp ($pwd = `pwd`);
1323 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1325 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1327 shell_or_die ("./tests/autotests.sh", $install_dir);
1328 } elsif (-e "./install.sh") {
1329 shell_or_die ("./install.sh", $install_dir);
1333 unlink "$destdir.commit.new";
1334 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1335 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1344 if ($ENV{"DEBUG"}) {
1345 print STDERR "@_\n";
1348 or die "@_ failed: $! exit 0x".sprintf("%x",$?);