2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 unless (defined $ENV{"CRUNCH_TMP"}) {
79 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
80 if ($ENV{"USER"} ne "crunch" && $< != 0) {
81 # use a tmp dir unique for my uid
82 $ENV{"CRUNCH_TMP"} .= "-$<";
85 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
86 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
87 mkdir ($ENV{"JOB_WORK"});
94 GetOptions('force-unlock' => \$force_unlock,
95 'git-dir=s' => \$git_dir,
97 'job-api-token=s' => \$job_api_token,
98 'resume-stash=s' => \$resume_stash,
101 if (defined $job_api_token) {
102 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
106 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
107 my $local_job = !$job_has_uuid;
116 $main::ENV{CRUNCH_DEBUG} = 1;
120 $main::ENV{CRUNCH_DEBUG} = 0;
125 my $arv = Arvados->new;
126 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
128 $metastream->write_start('log.txt');
130 my $User = $arv->{'users'}->{'current'}->execute;
138 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
139 if (!$force_unlock) {
140 if ($Job->{'is_locked_by_uuid'}) {
141 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
143 if ($Job->{'success'} ne undef) {
144 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
146 if ($Job->{'running'}) {
147 croak("Job 'running' flag is already set");
149 if ($Job->{'started_at'}) {
150 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156 $Job = JSON::decode_json($jobspec);
160 map { croak ("No $_ specified") unless $Job->{$_} }
161 qw(script script_version script_parameters);
164 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
165 $Job->{'started_at'} = gmtime;
167 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
171 $job_id = $Job->{'uuid'};
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
180 Log (undef, "check slurm allocation");
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
187 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188 push @sinfo, "$localcpus localhost";
190 if (exists $ENV{SLURM_NODELIST})
192 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
196 my ($ncpus, $slurm_nodelist) = split;
197 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
200 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
203 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
206 foreach (split (",", $ranges))
219 push @nodelist, map {
221 $n =~ s/\[[-,\d]+\]/$_/;
228 push @nodelist, $nodelist;
231 foreach my $nodename (@nodelist)
233 Log (undef, "node $nodename - $ncpus slots");
234 my $node = { name => $nodename,
238 foreach my $cpu (1..$ncpus)
240 push @slot, { node => $node,
244 push @node, @nodelist;
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259 # Claim this job, and make sure nobody else does
261 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
262 $Job->{'started_at'} = gmtime;
263 $Job->{'running'} = 1;
264 $Job->{'success'} = undef;
265 $Job->{'tasks_summary'} = { 'failed' => 0,
270 unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271 croak("Error while updating / locking job");
277 Log (undef, "start");
278 $SIG{'INT'} = sub { $main::please_freeze = 1; };
279 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
280 $SIG{'TERM'} = \&croak;
281 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
282 $SIG{'ALRM'} = sub { $main::please_info = 1; };
283 $SIG{'CONT'} = sub { $main::please_continue = 1; };
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
305 if (defined $Job->{thawedfromkey})
307 thaw ($Job->{thawedfromkey});
311 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312 'job_uuid' => $Job->{'uuid'},
317 push @jobstep, { 'level' => 0,
319 'arvados_task' => $first_task,
321 push @jobstep_todo, 0;
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
333 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
339 $build_script = <DATA>;
341 Log (undef, "Install revision ".$Job->{script_version});
342 my $nodelist = join(",", @node);
344 # Clean out crunch_tmp/work and crunch_tmp/opt
346 my $cleanpid = fork();
349 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
355 last if $cleanpid == waitpid (-1, WNOHANG);
356 freeze_if_want_freeze ($cleanpid);
357 select (undef, undef, undef, 0.1);
359 Log (undef, "Clean-work-dir exited $?");
361 # Install requested code version
364 my @srunargs = ("srun",
365 "--nodelist=$nodelist",
366 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
368 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
374 my $treeish = $Job->{'script_version'};
375 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376 # Todo: let script_version specify repository instead of expecting
377 # parent process to figure it out.
378 $ENV{"CRUNCH_SRC_URL"} = $repo;
380 # Create/update our clone of the remote git repo
382 if (!-d $ENV{"CRUNCH_SRC"}) {
383 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384 or croak ("git clone $repo failed: exit ".($?>>8));
385 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
387 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
389 # If this looks like a subversion r#, look for it in git-svn commit messages
391 if ($treeish =~ m{^\d{1,4}$}) {
392 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
394 if ($gitlog =~ /^[a-f0-9]{40}$/) {
396 Log (undef, "Using commit $commit for script_version $treeish");
400 # If that didn't work, try asking git to look it up as a tree-ish.
402 if (!defined $commit) {
404 my $cooked_treeish = $treeish;
405 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406 # Looks like a git branch name -- make sure git knows it's
407 # relative to the remote repo
408 $cooked_treeish = "origin/$treeish";
411 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
413 if ($found =~ /^[0-9a-f]{40}$/s) {
415 if ($commit ne $treeish) {
416 # Make sure we record the real commit id in the database,
417 # frozentokey, logs, etc. -- instead of an abbreviation or a
418 # branch name which can become ambiguous or point to a
419 # different commit in the future.
420 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421 Log (undef, "Using commit $commit for tree-ish $treeish");
422 if ($commit ne $treeish) {
423 $Job->{'script_version'} = $commit;
424 !$job_has_uuid or $Job->save() or croak("Error while updating job");
430 if (defined $commit) {
431 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432 @execargs = ("sh", "-c",
433 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
437 croak ("could not figure out commit id for $treeish");
440 my $installpid = fork();
441 if ($installpid == 0)
443 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
448 last if $installpid == waitpid (-1, WNOHANG);
449 freeze_if_want_freeze ($installpid);
450 select (undef, undef, undef, 0.1);
452 Log (undef, "Install exited $?");
457 foreach (qw (script script_version script_parameters runtime_constraints))
461 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
463 foreach (split (/\n/, $Job->{knobs}))
465 Log (undef, "knob " . $_);
470 $main::success = undef;
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481 or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
488 my @freeslot = (0..$#slot);
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
494 update_progress_stats();
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
501 my $id = $jobstep_todo[$todo_ptr];
502 my $Jobstep = $jobstep[$id];
503 if ($Jobstep->{level} != $level)
508 pipe $reader{$id}, "writer" or croak ($!);
509 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
510 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
512 my $childslot = $freeslot[0];
513 my $childnode = $slot[$childslot]->{node};
514 my $childslotname = join (".",
515 $slot[$childslot]->{node}->{name},
516 $slot[$childslot]->{cpu});
517 my $childpid = fork();
520 $SIG{'INT'} = 'DEFAULT';
521 $SIG{'QUIT'} = 'DEFAULT';
522 $SIG{'TERM'} = 'DEFAULT';
524 foreach (values (%reader))
528 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
529 open(STDOUT,">&writer");
530 open(STDERR,">&writer");
535 delete $ENV{"GNUPGHOME"};
536 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
537 $ENV{"TASK_QSEQUENCE"} = $id;
538 $ENV{"TASK_SEQUENCE"} = $level;
539 $ENV{"JOB_SCRIPT"} = $Job->{script};
540 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
541 $param =~ tr/a-z/A-Z/;
542 $ENV{"JOB_PARAMETER_$param"} = $value;
544 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
545 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
546 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
547 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
548 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
554 "--nodelist=".$childnode->{name},
555 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556 "--job-name=$job_id.$id.$$",
558 my @execargs = qw(sh);
559 my $build_script_to_send = "";
561 "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
562 ."&& cd $ENV{CRUNCH_TMP} ";
565 $build_script_to_send = $build_script;
569 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
570 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
571 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
573 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
574 my @execargs = ('bash', '-c', $command);
575 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
579 if (!defined $childpid)
586 $proc{$childpid} = { jobstep => $id,
589 jobstepname => "$job_id.$id.$childpid",
591 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
592 $slot[$childslot]->{pid} = $childpid;
594 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
595 Log ($id, "child $childpid started on $childslotname");
596 $Jobstep->{starttime} = time;
597 $Jobstep->{node} = $childnode->{name};
598 $Jobstep->{slotindex} = $childslot;
599 delete $Jobstep->{stderr};
600 delete $Jobstep->{finishtime};
602 splice @jobstep_todo, $todo_ptr, 1;
605 $progress_is_dirty = 1;
609 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
611 last THISROUND if $main::please_freeze;
612 if ($main::please_info)
614 $main::please_info = 0;
618 update_progress_stats();
626 update_progress_stats();
627 select (undef, undef, undef, 0.1);
629 elsif (time - $progress_stats_updated >= 30)
631 update_progress_stats();
633 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
634 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
636 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
637 .($thisround_failed+$thisround_succeeded)
638 .") -- giving up on this round";
639 Log (undef, $message);
643 # move slots from freeslot to holdslot (or back to freeslot) if necessary
644 for (my $i=$#freeslot; $i>=0; $i--) {
645 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
646 push @holdslot, (splice @freeslot, $i, 1);
649 for (my $i=$#holdslot; $i>=0; $i--) {
650 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
651 push @freeslot, (splice @holdslot, $i, 1);
655 # give up if no nodes are succeeding
656 if (!grep { $_->{node}->{losing_streak} == 0 &&
657 $_->{node}->{hold_count} < 4 } @slot) {
658 my $message = "Every node has failed -- giving up on this round";
659 Log (undef, $message);
666 push @freeslot, splice @holdslot;
667 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
670 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
673 if ($main::please_continue) {
674 $main::please_continue = 0;
677 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
682 update_progress_stats();
683 select (undef, undef, undef, 0.1);
684 killem (keys %proc) if $main::please_freeze;
688 update_progress_stats();
689 freeze_if_want_freeze();
692 if (!defined $main::success)
695 $thisround_succeeded == 0 &&
696 ($thisround_failed == 0 || $thisround_failed > 4))
698 my $message = "stop because $thisround_failed tasks failed and none succeeded";
699 Log (undef, $message);
708 goto ONELEVEL if !defined $main::success;
711 release_allocation();
714 $Job->{'output'} = &collate_output();
715 $Job->{'running'} = 0;
716 $Job->{'success'} = $Job->{'output'} && $main::success;
717 $Job->{'finished_at'} = gmtime;
718 $Job->save if $job_has_uuid;
720 if ($Job->{'output'})
723 my $manifest_text = capturex("whget", $Job->{'output'});
724 $arv->{'collections'}->{'create'}->execute('collection' => {
725 'uuid' => $Job->{'output'},
726 'manifest_text' => $manifest_text,
730 Log (undef, "Failed to register output manifest: $@");
734 Log (undef, "finish");
741 sub update_progress_stats
743 $progress_stats_updated = time;
744 return if !$progress_is_dirty;
745 my ($todo, $done, $running) = (scalar @jobstep_todo,
746 scalar @jobstep_done,
747 scalar @slot - scalar @freeslot - scalar @holdslot);
748 $Job->{'tasks_summary'} ||= {};
749 $Job->{'tasks_summary'}->{'todo'} = $todo;
750 $Job->{'tasks_summary'}->{'done'} = $done;
751 $Job->{'tasks_summary'}->{'running'} = $running;
752 $Job->save if $job_has_uuid;
753 Log (undef, "status: $done done, $running running, $todo todo");
754 $progress_is_dirty = 0;
761 my $pid = waitpid (-1, WNOHANG);
762 return 0 if $pid <= 0;
764 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
766 . $slot[$proc{$pid}->{slot}]->{cpu});
767 my $jobstepid = $proc{$pid}->{jobstep};
768 my $elapsed = time - $proc{$pid}->{time};
769 my $Jobstep = $jobstep[$jobstepid];
771 my $childstatus = $?;
772 my $exitvalue = $childstatus >> 8;
773 my $exitinfo = sprintf("exit %d signal %d%s",
776 ($childstatus & 128 ? ' core dump' : ''));
777 $Jobstep->{'arvados_task'}->reload;
778 my $task_success = $Jobstep->{'arvados_task'}->{success};
780 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
782 if (!defined $task_success) {
783 # task did not indicate one way or the other --> fail
784 $Jobstep->{'arvados_task'}->{success} = 0;
785 $Jobstep->{'arvados_task'}->save;
792 $temporary_fail ||= $Jobstep->{node_fail};
793 $temporary_fail ||= ($exitvalue == 111);
796 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
798 # Check for signs of a failed or misconfigured node
799 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
800 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
801 # Don't count this against jobstep failure thresholds if this
802 # node is already suspected faulty and srun exited quickly
803 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
805 Log ($jobstepid, "blaming failure on suspect node " .
806 $slot[$proc{$pid}->{slot}]->{node}->{name});
807 $temporary_fail ||= 1;
809 ban_node_by_slot($proc{$pid}->{slot});
812 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
813 ++$Jobstep->{'failures'},
814 $temporary_fail ? 'temporary ' : 'permanent',
817 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
818 # Give up on this task, and the whole job
820 $main::please_freeze = 1;
823 # Put this task back on the todo queue
824 push @jobstep_todo, $jobstepid;
826 $Job->{'tasks_summary'}->{'failed'}++;
830 ++$thisround_succeeded;
831 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
832 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
833 push @jobstep_done, $jobstepid;
834 Log ($jobstepid, "success in $elapsed seconds");
836 $Jobstep->{exitcode} = $childstatus;
837 $Jobstep->{finishtime} = time;
838 process_stderr ($jobstepid, $task_success);
839 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
841 close $reader{$jobstepid};
842 delete $reader{$jobstepid};
843 delete $slot[$proc{$pid}->{slot}]->{pid};
844 push @freeslot, $proc{$pid}->{slot};
848 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
850 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
852 'order' => 'qsequence'
854 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
856 'level' => $arvados_task->{'sequence'},
858 'arvados_task' => $arvados_task
860 push @jobstep, $jobstep;
861 push @jobstep_todo, $#jobstep;
864 $progress_is_dirty = 1;
871 # return if the kill list was checked <4 seconds ago
872 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
876 $squeue_kill_checked = time;
878 # use killem() on procs whose killtime is reached
881 if (exists $proc{$_}->{killtime}
882 && $proc{$_}->{killtime} <= time)
888 # return if the squeue was checked <60 seconds ago
889 if (defined $squeue_checked && $squeue_checked > time - 60)
893 $squeue_checked = time;
897 # here is an opportunity to check for mysterious problems with local procs
901 # get a list of steps still running
902 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
904 if ($squeue[-1] ne "ok")
910 # which of my jobsteps are running, according to squeue?
914 if (/^(\d+)\.(\d+) (\S+)/)
916 if ($1 eq $ENV{SLURM_JOBID})
923 # which of my active child procs (>60s old) were not mentioned by squeue?
926 if ($proc{$_}->{time} < time - 60
927 && !exists $ok{$proc{$_}->{jobstepname}}
928 && !exists $proc{$_}->{killtime})
930 # kill this proc if it hasn't exited in 30 seconds
931 $proc{$_}->{killtime} = time + 30;
937 sub release_allocation
941 Log (undef, "release job allocation");
942 system "scancel $ENV{SLURM_JOBID}";
950 foreach my $job (keys %reader)
953 while (0 < sysread ($reader{$job}, $buf, 8192))
955 print STDERR $buf if $ENV{CRUNCH_DEBUG};
956 $jobstep[$job]->{stderr} .= $buf;
957 preprocess_stderr ($job);
958 if (length ($jobstep[$job]->{stderr}) > 16384)
960 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
969 sub preprocess_stderr
973 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
975 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
976 Log ($job, "stderr $line");
977 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
979 $main::please_freeze = 1;
981 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
982 $jobstep[$job]->{node_fail} = 1;
983 ban_node_by_slot($jobstep[$job]->{slotindex});
992 my $task_success = shift;
993 preprocess_stderr ($job);
996 Log ($job, "stderr $_");
997 } split ("\n", $jobstep[$job]->{stderr});
1003 my $whc = Warehouse->new;
1004 Log (undef, "collate");
1005 $whc->write_start (1);
1009 next if (!exists $_->{'arvados_task'}->{output} ||
1010 !$_->{'arvados_task'}->{'success'} ||
1011 $_->{'exitcode'} != 0);
1012 my $output = $_->{'arvados_task'}->{output};
1013 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1015 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1016 $whc->write_data ($output);
1018 elsif (@jobstep == 1)
1020 $joboutput = $output;
1023 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1025 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1026 $whc->write_data ($outblock);
1030 my $errstr = $whc->errstr;
1031 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1035 $joboutput = $whc->write_finish if !defined $joboutput;
1038 Log (undef, "output $joboutput");
1039 $Job->{'output'} = $joboutput;
1040 $Job->save if $job_has_uuid;
1044 Log (undef, "output undef");
1054 my $sig = 2; # SIGINT first
1055 if (exists $proc{$_}->{"sent_$sig"} &&
1056 time - $proc{$_}->{"sent_$sig"} > 4)
1058 $sig = 15; # SIGTERM if SIGINT doesn't work
1060 if (exists $proc{$_}->{"sent_$sig"} &&
1061 time - $proc{$_}->{"sent_$sig"} > 4)
1063 $sig = 9; # SIGKILL if SIGTERM doesn't work
1065 if (!exists $proc{$_}->{"sent_$sig"})
1067 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1069 select (undef, undef, undef, 0.1);
1072 kill $sig, $_; # srun wants two SIGINT to really interrupt
1074 $proc{$_}->{"sent_$sig"} = time;
1075 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1085 vec($bits,fileno($_),1) = 1;
1091 sub Log # ($jobstep_id, $logmessage)
1093 if ($_[1] =~ /\n/) {
1094 for my $line (split (/\n/, $_[1])) {
1099 my $fh = select STDERR; $|=1; select $fh;
1100 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1101 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1104 if ($metastream || -t STDERR) {
1105 my @gmtime = gmtime;
1106 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1107 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1109 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1111 return if !$metastream;
1112 $metastream->write_data ($datetime . " " . $message);
1118 my ($package, $file, $line) = caller;
1119 my $message = "@_ at $file line $line\n";
1120 Log (undef, $message);
1121 freeze() if @jobstep_todo;
1122 collate_output() if @jobstep_todo;
1124 save_meta() if $metastream;
1131 return if !$job_has_uuid;
1133 $Job->{'running'} = 0;
1134 $Job->{'success'} = 0;
1135 $Job->{'finished_at'} = gmtime;
1142 my $justcheckpoint = shift; # false if this will be the last meta saved
1143 my $m = $metastream;
1144 $m = $m->copy if $justcheckpoint;
1146 my $loglocator = $m->as_key;
1147 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1148 Log (undef, "meta key is $loglocator");
1149 $Job->{'log'} = $loglocator;
1150 $Job->save if $job_has_uuid;
1154 sub freeze_if_want_freeze
1156 if ($main::please_freeze)
1158 release_allocation();
1161 # kill some srun procs before freeze+stop
1162 map { $proc{$_} = {} } @_;
1165 killem (keys %proc);
1166 select (undef, undef, undef, 0.1);
1168 while (($died = waitpid (-1, WNOHANG)) > 0)
1170 delete $proc{$died};
1185 Log (undef, "Freeze not implemented");
1192 croak ("Thaw not implemented");
1196 Log (undef, "thaw from $key");
1201 @jobstep_tomerge = ();
1202 $jobstep_tomerge_level = 0;
1205 my $stream = new Warehouse::Stream ( whc => $whc,
1206 hash => [split (",", $key)] );
1208 while (my $dataref = $stream->read_until (undef, "\n\n"))
1210 if ($$dataref =~ /^job /)
1212 foreach (split ("\n", $$dataref))
1214 my ($k, $v) = split ("=", $_, 2);
1215 $frozenjob->{$k} = freezeunquote ($v);
1220 if ($$dataref =~ /^merge (\d+) (.*)/)
1222 $jobstep_tomerge_level = $1;
1224 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1229 foreach (split ("\n", $$dataref))
1231 my ($k, $v) = split ("=", $_, 2);
1232 $Jobstep->{$k} = freezeunquote ($v) if $k;
1234 $Jobstep->{'failures'} = 0;
1235 push @jobstep, $Jobstep;
1237 if ($Jobstep->{exitcode} eq "0")
1239 push @jobstep_done, $#jobstep;
1243 push @jobstep_todo, $#jobstep;
1247 foreach (qw (script script_version script_parameters))
1249 $Job->{$_} = $frozenjob->{$_};
1251 $Job->save if $job_has_uuid;
1267 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1274 my $srunargs = shift;
1275 my $execargs = shift;
1276 my $opts = shift || {};
1278 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1279 print STDERR (join (" ",
1280 map { / / ? "'$_'" : $_ }
1283 if $ENV{CRUNCH_DEBUG};
1285 if (defined $stdin) {
1286 my $child = open STDIN, "-|";
1287 defined $child or die "no fork: $!";
1289 print $stdin or die $!;
1290 close STDOUT or die $!;
1295 return system (@$args) if $opts->{fork};
1298 warn "ENV size is ".length(join(" ",%ENV));
1299 die "exec failed: $!: @$args";
1303 sub ban_node_by_slot {
1304 # Don't start any new jobsteps on this node for 60 seconds
1306 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1307 $slot[$slotid]->{node}->{hold_count}++;
1308 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1314 # checkout-and-build
1318 my $destdir = $ENV{"CRUNCH_SRC"};
1319 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1320 my $repo = $ENV{"CRUNCH_SRC_URL"};
1322 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1324 if (readlink ("$destdir.commit") eq $commit) {
1328 unlink "$destdir.commit";
1329 open STDOUT, ">", "$destdir.log";
1330 open STDERR, ">&STDOUT";
1333 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1336 die "'tar -C $destdir -xf -' exited $?: $!";
1340 chomp ($pwd = `pwd`);
1341 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1343 if (-e "$destdir/crunch_scripts/install") {
1344 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1345 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1347 shell_or_die ("./tests/autotests.sh", $install_dir);
1348 } elsif (-e "./install.sh") {
1349 shell_or_die ("./install.sh", $install_dir);
1353 unlink "$destdir.commit.new";
1354 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1355 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1364 if ($ENV{"DEBUG"}) {
1365 print STDERR "@_\n";
1368 or die "@_ failed: $! exit 0x".sprintf("%x",$?);