2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
95 GetOptions('force-unlock' => \$force_unlock,
96 'git-dir=s' => \$git_dir,
98 'job-api-token=s' => \$job_api_token,
99 'resume-stash=s' => \$resume_stash,
102 if (defined $job_api_token) {
103 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
113 $main::ENV{CRUNCH_DEBUG} = 1;
117 $main::ENV{CRUNCH_DEBUG} = 0;
122 my $arv = Arvados->new;
123 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
125 $metastream->write_start('log.txt');
127 my $User = $arv->{'users'}->{'current'}->execute;
135 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
136 if (!$force_unlock) {
137 if ($Job->{'is_locked_by_uuid'}) {
138 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
140 if ($Job->{'success'} ne undef) {
141 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
143 if ($Job->{'running'}) {
144 croak("Job 'running' flag is already set");
146 if ($Job->{'started_at'}) {
147 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
153 $Job = JSON::decode_json($jobspec);
157 map { croak ("No $_ specified") unless $Job->{$_} }
158 qw(script script_version script_parameters);
161 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
162 $Job->{'started_at'} = gmtime;
164 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
168 $job_id = $Job->{'uuid'};
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
177 Log (undef, "check slurm allocation");
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185 push @sinfo, "$localcpus localhost";
187 if (exists $ENV{SLURM_NODELIST})
189 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 my ($ncpus, $slurm_nodelist) = split;
194 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
197 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
200 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
203 foreach (split (",", $ranges))
216 push @nodelist, map {
218 $n =~ s/\[[-,\d]+\]/$_/;
225 push @nodelist, $nodelist;
228 foreach my $nodename (@nodelist)
230 Log (undef, "node $nodename - $ncpus slots");
231 my $node = { name => $nodename,
235 foreach my $cpu (1..$ncpus)
237 push @slot, { node => $node,
241 push @node, @nodelist;
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
256 # Claim this job, and make sure nobody else does
257 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259 croak("Error while updating / locking job");
261 $Job->update_attributes('started_at' => scalar gmtime,
264 'tasks_summary' => { 'failed' => 0,
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299 my $latest_refresh = scalar time;
303 if (defined $Job->{thawedfromkey})
305 thaw ($Job->{thawedfromkey});
309 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310 'job_uuid' => $Job->{'uuid'},
315 push @jobstep, { 'level' => 0,
317 'arvados_task' => $first_task,
319 push @jobstep_todo, 0;
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
337 $build_script = <DATA>;
339 Log (undef, "Install revision ".$Job->{script_version});
340 my $nodelist = join(",", @node);
342 # Clean out crunch_tmp/work and crunch_tmp/opt
344 my $cleanpid = fork();
347 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
353 last if $cleanpid == waitpid (-1, WNOHANG);
354 freeze_if_want_freeze ($cleanpid);
355 select (undef, undef, undef, 0.1);
357 Log (undef, "Clean-work-dir exited $?");
359 # Install requested code version
362 my @srunargs = ("srun",
363 "--nodelist=$nodelist",
364 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
366 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
372 my $treeish = $Job->{'script_version'};
373 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374 # Todo: let script_version specify repository instead of expecting
375 # parent process to figure it out.
376 $ENV{"CRUNCH_SRC_URL"} = $repo;
378 # Create/update our clone of the remote git repo
380 if (!-d $ENV{"CRUNCH_SRC"}) {
381 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382 or croak ("git clone $repo failed: exit ".($?>>8));
383 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
385 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
387 # If this looks like a subversion r#, look for it in git-svn commit messages
389 if ($treeish =~ m{^\d{1,4}$}) {
390 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
392 if ($gitlog =~ /^[a-f0-9]{40}$/) {
394 Log (undef, "Using commit $commit for script_version $treeish");
398 # If that didn't work, try asking git to look it up as a tree-ish.
400 if (!defined $commit) {
402 my $cooked_treeish = $treeish;
403 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404 # Looks like a git branch name -- make sure git knows it's
405 # relative to the remote repo
406 $cooked_treeish = "origin/$treeish";
409 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
411 if ($found =~ /^[0-9a-f]{40}$/s) {
413 if ($commit ne $treeish) {
414 # Make sure we record the real commit id in the database,
415 # frozentokey, logs, etc. -- instead of an abbreviation or a
416 # branch name which can become ambiguous or point to a
417 # different commit in the future.
418 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419 Log (undef, "Using commit $commit for tree-ish $treeish");
420 if ($commit ne $treeish) {
421 $Job->{'script_version'} = $commit;
423 $Job->update_attributes('script_version' => $commit) or
424 croak("Error while updating job");
430 if (defined $commit) {
431 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432 @execargs = ("sh", "-c",
433 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
437 croak ("could not figure out commit id for $treeish");
440 my $installpid = fork();
441 if ($installpid == 0)
443 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
448 last if $installpid == waitpid (-1, WNOHANG);
449 freeze_if_want_freeze ($installpid);
450 select (undef, undef, undef, 0.1);
452 Log (undef, "Install exited $?");
457 foreach (qw (script script_version script_parameters runtime_constraints))
461 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
463 foreach (split (/\n/, $Job->{knobs}))
465 Log (undef, "knob " . $_);
470 $main::success = undef;
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481 or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
488 my @freeslot = (0..$#slot);
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
494 update_progress_stats();
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
501 my $id = $jobstep_todo[$todo_ptr];
502 my $Jobstep = $jobstep[$id];
503 if ($Jobstep->{level} != $level)
508 pipe $reader{$id}, "writer" or croak ($!);
509 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
510 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
512 my $childslot = $freeslot[0];
513 my $childnode = $slot[$childslot]->{node};
514 my $childslotname = join (".",
515 $slot[$childslot]->{node}->{name},
516 $slot[$childslot]->{cpu});
517 my $childpid = fork();
520 $SIG{'INT'} = 'DEFAULT';
521 $SIG{'QUIT'} = 'DEFAULT';
522 $SIG{'TERM'} = 'DEFAULT';
524 foreach (values (%reader))
528 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
529 open(STDOUT,">&writer");
530 open(STDERR,">&writer");
535 delete $ENV{"GNUPGHOME"};
536 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
537 $ENV{"TASK_QSEQUENCE"} = $id;
538 $ENV{"TASK_SEQUENCE"} = $level;
539 $ENV{"JOB_SCRIPT"} = $Job->{script};
540 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
541 $param =~ tr/a-z/A-Z/;
542 $ENV{"JOB_PARAMETER_$param"} = $value;
544 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
545 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
546 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
547 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
548 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
554 "--nodelist=".$childnode->{name},
555 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556 "--job-name=$job_id.$id.$$",
558 my @execargs = qw(sh);
559 my $build_script_to_send = "";
561 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
562 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
563 ."&& cd $ENV{CRUNCH_TMP} ";
566 $build_script_to_send = $build_script;
570 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
571 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
572 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
574 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
575 my @execargs = ('bash', '-c', $command);
576 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
580 if (!defined $childpid)
587 $proc{$childpid} = { jobstep => $id,
590 jobstepname => "$job_id.$id.$childpid",
592 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
593 $slot[$childslot]->{pid} = $childpid;
595 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
596 Log ($id, "child $childpid started on $childslotname");
597 $Jobstep->{starttime} = time;
598 $Jobstep->{node} = $childnode->{name};
599 $Jobstep->{slotindex} = $childslot;
600 delete $Jobstep->{stderr};
601 delete $Jobstep->{finishtime};
603 splice @jobstep_todo, $todo_ptr, 1;
606 $progress_is_dirty = 1;
610 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
612 last THISROUND if $main::please_freeze;
613 if ($main::please_info)
615 $main::please_info = 0;
619 update_progress_stats();
626 check_refresh_wanted();
628 update_progress_stats();
629 select (undef, undef, undef, 0.1);
631 elsif (time - $progress_stats_updated >= 30)
633 update_progress_stats();
635 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
636 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
638 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
639 .($thisround_failed+$thisround_succeeded)
640 .") -- giving up on this round";
641 Log (undef, $message);
645 # move slots from freeslot to holdslot (or back to freeslot) if necessary
646 for (my $i=$#freeslot; $i>=0; $i--) {
647 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
648 push @holdslot, (splice @freeslot, $i, 1);
651 for (my $i=$#holdslot; $i>=0; $i--) {
652 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
653 push @freeslot, (splice @holdslot, $i, 1);
657 # give up if no nodes are succeeding
658 if (!grep { $_->{node}->{losing_streak} == 0 &&
659 $_->{node}->{hold_count} < 4 } @slot) {
660 my $message = "Every node has failed -- giving up on this round";
661 Log (undef, $message);
668 push @freeslot, splice @holdslot;
669 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
672 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
675 if ($main::please_continue) {
676 $main::please_continue = 0;
679 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
683 check_refresh_wanted();
685 update_progress_stats();
686 select (undef, undef, undef, 0.1);
687 killem (keys %proc) if $main::please_freeze;
691 update_progress_stats();
692 freeze_if_want_freeze();
695 if (!defined $main::success)
698 $thisround_succeeded == 0 &&
699 ($thisround_failed == 0 || $thisround_failed > 4))
701 my $message = "stop because $thisround_failed tasks failed and none succeeded";
702 Log (undef, $message);
711 goto ONELEVEL if !defined $main::success;
714 release_allocation();
717 $Job->update_attributes('output' => &collate_output(),
719 'success' => $Job->{'output'} && $main::success,
720 'finished_at' => scalar gmtime)
723 if ($Job->{'output'})
726 my $manifest_text = capturex("whget", $Job->{'output'});
727 $arv->{'collections'}->{'create'}->execute('collection' => {
728 'uuid' => $Job->{'output'},
729 'manifest_text' => $manifest_text,
733 Log (undef, "Failed to register output manifest: $@");
737 Log (undef, "finish");
744 sub update_progress_stats
746 $progress_stats_updated = time;
747 return if !$progress_is_dirty;
748 my ($todo, $done, $running) = (scalar @jobstep_todo,
749 scalar @jobstep_done,
750 scalar @slot - scalar @freeslot - scalar @holdslot);
751 $Job->{'tasks_summary'} ||= {};
752 $Job->{'tasks_summary'}->{'todo'} = $todo;
753 $Job->{'tasks_summary'}->{'done'} = $done;
754 $Job->{'tasks_summary'}->{'running'} = $running;
756 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
758 Log (undef, "status: $done done, $running running, $todo todo");
759 $progress_is_dirty = 0;
766 my $pid = waitpid (-1, WNOHANG);
767 return 0 if $pid <= 0;
769 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
771 . $slot[$proc{$pid}->{slot}]->{cpu});
772 my $jobstepid = $proc{$pid}->{jobstep};
773 my $elapsed = time - $proc{$pid}->{time};
774 my $Jobstep = $jobstep[$jobstepid];
776 my $childstatus = $?;
777 my $exitvalue = $childstatus >> 8;
778 my $exitinfo = sprintf("exit %d signal %d%s",
781 ($childstatus & 128 ? ' core dump' : ''));
782 $Jobstep->{'arvados_task'}->reload;
783 my $task_success = $Jobstep->{'arvados_task'}->{success};
785 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
787 if (!defined $task_success) {
788 # task did not indicate one way or the other --> fail
789 $Jobstep->{'arvados_task'}->{success} = 0;
790 $Jobstep->{'arvados_task'}->save;
797 $temporary_fail ||= $Jobstep->{node_fail};
798 $temporary_fail ||= ($exitvalue == 111);
801 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
803 # Check for signs of a failed or misconfigured node
804 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
805 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
806 # Don't count this against jobstep failure thresholds if this
807 # node is already suspected faulty and srun exited quickly
808 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
810 Log ($jobstepid, "blaming failure on suspect node " .
811 $slot[$proc{$pid}->{slot}]->{node}->{name});
812 $temporary_fail ||= 1;
814 ban_node_by_slot($proc{$pid}->{slot});
817 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
818 ++$Jobstep->{'failures'},
819 $temporary_fail ? 'temporary ' : 'permanent',
822 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
823 # Give up on this task, and the whole job
825 $main::please_freeze = 1;
828 # Put this task back on the todo queue
829 push @jobstep_todo, $jobstepid;
831 $Job->{'tasks_summary'}->{'failed'}++;
835 ++$thisround_succeeded;
836 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
837 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
838 push @jobstep_done, $jobstepid;
839 Log ($jobstepid, "success in $elapsed seconds");
841 $Jobstep->{exitcode} = $childstatus;
842 $Jobstep->{finishtime} = time;
843 process_stderr ($jobstepid, $task_success);
844 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
846 close $reader{$jobstepid};
847 delete $reader{$jobstepid};
848 delete $slot[$proc{$pid}->{slot}]->{pid};
849 push @freeslot, $proc{$pid}->{slot};
853 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
855 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
857 'order' => 'qsequence'
859 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
861 'level' => $arvados_task->{'sequence'},
863 'arvados_task' => $arvados_task
865 push @jobstep, $jobstep;
866 push @jobstep_todo, $#jobstep;
869 $progress_is_dirty = 1;
873 sub check_refresh_wanted
875 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
876 if (@stat && $stat[9] > $latest_refresh) {
877 $latest_refresh = scalar time;
879 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
880 for my $attr ('cancelled_at',
881 'cancelled_by_user_uuid',
882 'cancelled_by_client_uuid') {
883 $Job->{$attr} = $Job2->{$attr};
885 if ($Job->{'cancelled_at'}) {
886 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
887 " by user " . $Job->{cancelled_by_user_uuid});
889 $main::please_freeze = 1;
897 # return if the kill list was checked <4 seconds ago
898 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
902 $squeue_kill_checked = time;
904 # use killem() on procs whose killtime is reached
907 if (exists $proc{$_}->{killtime}
908 && $proc{$_}->{killtime} <= time)
914 # return if the squeue was checked <60 seconds ago
915 if (defined $squeue_checked && $squeue_checked > time - 60)
919 $squeue_checked = time;
923 # here is an opportunity to check for mysterious problems with local procs
927 # get a list of steps still running
928 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
930 if ($squeue[-1] ne "ok")
936 # which of my jobsteps are running, according to squeue?
940 if (/^(\d+)\.(\d+) (\S+)/)
942 if ($1 eq $ENV{SLURM_JOBID})
949 # which of my active child procs (>60s old) were not mentioned by squeue?
952 if ($proc{$_}->{time} < time - 60
953 && !exists $ok{$proc{$_}->{jobstepname}}
954 && !exists $proc{$_}->{killtime})
956 # kill this proc if it hasn't exited in 30 seconds
957 $proc{$_}->{killtime} = time + 30;
963 sub release_allocation
967 Log (undef, "release job allocation");
968 system "scancel $ENV{SLURM_JOBID}";
976 foreach my $job (keys %reader)
979 while (0 < sysread ($reader{$job}, $buf, 8192))
981 print STDERR $buf if $ENV{CRUNCH_DEBUG};
982 $jobstep[$job]->{stderr} .= $buf;
983 preprocess_stderr ($job);
984 if (length ($jobstep[$job]->{stderr}) > 16384)
986 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
995 sub preprocess_stderr
999 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1001 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1002 Log ($job, "stderr $line");
1003 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1005 $main::please_freeze = 1;
1007 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1008 $jobstep[$job]->{node_fail} = 1;
1009 ban_node_by_slot($jobstep[$job]->{slotindex});
1018 my $task_success = shift;
1019 preprocess_stderr ($job);
1022 Log ($job, "stderr $_");
1023 } split ("\n", $jobstep[$job]->{stderr});
1029 my $whc = Warehouse->new;
1030 Log (undef, "collate");
1031 $whc->write_start (1);
1035 next if (!exists $_->{'arvados_task'}->{output} ||
1036 !$_->{'arvados_task'}->{'success'} ||
1037 $_->{'exitcode'} != 0);
1038 my $output = $_->{'arvados_task'}->{output};
1039 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1041 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1042 $whc->write_data ($output);
1044 elsif (@jobstep == 1)
1046 $joboutput = $output;
1049 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1051 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1052 $whc->write_data ($outblock);
1056 my $errstr = $whc->errstr;
1057 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1061 $joboutput = $whc->write_finish if !defined $joboutput;
1064 Log (undef, "output $joboutput");
1065 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1069 Log (undef, "output undef");
1079 my $sig = 2; # SIGINT first
1080 if (exists $proc{$_}->{"sent_$sig"} &&
1081 time - $proc{$_}->{"sent_$sig"} > 4)
1083 $sig = 15; # SIGTERM if SIGINT doesn't work
1085 if (exists $proc{$_}->{"sent_$sig"} &&
1086 time - $proc{$_}->{"sent_$sig"} > 4)
1088 $sig = 9; # SIGKILL if SIGTERM doesn't work
1090 if (!exists $proc{$_}->{"sent_$sig"})
1092 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1094 select (undef, undef, undef, 0.1);
1097 kill $sig, $_; # srun wants two SIGINT to really interrupt
1099 $proc{$_}->{"sent_$sig"} = time;
1100 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1110 vec($bits,fileno($_),1) = 1;
1116 sub Log # ($jobstep_id, $logmessage)
1118 if ($_[1] =~ /\n/) {
1119 for my $line (split (/\n/, $_[1])) {
1124 my $fh = select STDERR; $|=1; select $fh;
1125 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1126 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1129 if ($metastream || -t STDERR) {
1130 my @gmtime = gmtime;
1131 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1132 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1134 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1136 return if !$metastream;
1137 $metastream->write_data ($datetime . " " . $message);
1143 my ($package, $file, $line) = caller;
1144 my $message = "@_ at $file line $line\n";
1145 Log (undef, $message);
1146 freeze() if @jobstep_todo;
1147 collate_output() if @jobstep_todo;
1149 save_meta() if $metastream;
1156 return if !$job_has_uuid;
1157 $Job->update_attributes('running' => 0,
1159 'finished_at' => scalar gmtime);
1165 my $justcheckpoint = shift; # false if this will be the last meta saved
1166 my $m = $metastream;
1167 $m = $m->copy if $justcheckpoint;
1169 my $loglocator = $m->as_key;
1170 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1171 Log (undef, "meta key is $loglocator");
1172 $Job->{'log'} = $loglocator;
1173 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1177 sub freeze_if_want_freeze
1179 if ($main::please_freeze)
1181 release_allocation();
1184 # kill some srun procs before freeze+stop
1185 map { $proc{$_} = {} } @_;
1188 killem (keys %proc);
1189 select (undef, undef, undef, 0.1);
1191 while (($died = waitpid (-1, WNOHANG)) > 0)
1193 delete $proc{$died};
1208 Log (undef, "Freeze not implemented");
1215 croak ("Thaw not implemented");
1219 Log (undef, "thaw from $key");
1224 @jobstep_tomerge = ();
1225 $jobstep_tomerge_level = 0;
1228 my $stream = new Warehouse::Stream ( whc => $whc,
1229 hash => [split (",", $key)] );
1231 while (my $dataref = $stream->read_until (undef, "\n\n"))
1233 if ($$dataref =~ /^job /)
1235 foreach (split ("\n", $$dataref))
1237 my ($k, $v) = split ("=", $_, 2);
1238 $frozenjob->{$k} = freezeunquote ($v);
1243 if ($$dataref =~ /^merge (\d+) (.*)/)
1245 $jobstep_tomerge_level = $1;
1247 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1252 foreach (split ("\n", $$dataref))
1254 my ($k, $v) = split ("=", $_, 2);
1255 $Jobstep->{$k} = freezeunquote ($v) if $k;
1257 $Jobstep->{'failures'} = 0;
1258 push @jobstep, $Jobstep;
1260 if ($Jobstep->{exitcode} eq "0")
1262 push @jobstep_done, $#jobstep;
1266 push @jobstep_todo, $#jobstep;
1270 foreach (qw (script script_version script_parameters))
1272 $Job->{$_} = $frozenjob->{$_};
1274 $Job->save if $job_has_uuid;
1290 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1297 my $srunargs = shift;
1298 my $execargs = shift;
1299 my $opts = shift || {};
1301 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1302 print STDERR (join (" ",
1303 map { / / ? "'$_'" : $_ }
1306 if $ENV{CRUNCH_DEBUG};
1308 if (defined $stdin) {
1309 my $child = open STDIN, "-|";
1310 defined $child or die "no fork: $!";
1312 print $stdin or die $!;
1313 close STDOUT or die $!;
1318 return system (@$args) if $opts->{fork};
1321 warn "ENV size is ".length(join(" ",%ENV));
1322 die "exec failed: $!: @$args";
1326 sub ban_node_by_slot {
1327 # Don't start any new jobsteps on this node for 60 seconds
1329 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1330 $slot[$slotid]->{node}->{hold_count}++;
1331 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1337 # checkout-and-build
1341 my $destdir = $ENV{"CRUNCH_SRC"};
1342 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1343 my $repo = $ENV{"CRUNCH_SRC_URL"};
1345 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1347 if (readlink ("$destdir.commit") eq $commit) {
1351 unlink "$destdir.commit";
1352 open STDOUT, ">", "$destdir.log";
1353 open STDERR, ">&STDOUT";
1356 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1359 die "'tar -C $destdir -xf -' exited $?: $!";
1363 chomp ($pwd = `pwd`);
1364 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1366 if (-e "$destdir/crunch_scripts/install") {
1367 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1368 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1370 shell_or_die ("./tests/autotests.sh", $install_dir);
1371 } elsif (-e "./install.sh") {
1372 shell_or_die ("./install.sh", $install_dir);
1376 unlink "$destdir.commit.new";
1377 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1378 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1387 if ($ENV{"DEBUG"}) {
1388 print STDERR "@_\n";
1391 or die "@_ failed: $! exit 0x".sprintf("%x",$?);