2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
96 GetOptions('force-unlock' => \$force_unlock,
97 'git-dir=s' => \$git_dir,
99 'job-api-token=s' => \$job_api_token,
100 'resume-stash=s' => \$resume_stash,
103 if (defined $job_api_token) {
104 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
114 $main::ENV{CRUNCH_DEBUG} = 1;
118 $main::ENV{CRUNCH_DEBUG} = 0;
123 my $arv = Arvados->new('apiVersion' => 'v1');
126 my $User = $arv->{'users'}->{'current'}->execute;
134 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135 if (!$force_unlock) {
136 if ($Job->{'is_locked_by_uuid'}) {
137 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139 if ($Job->{'success'} ne undef) {
140 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142 if ($Job->{'running'}) {
143 croak("Job 'running' flag is already set");
145 if ($Job->{'started_at'}) {
146 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
152 $Job = JSON::decode_json($jobspec);
156 map { croak ("No $_ specified") unless $Job->{$_} }
157 qw(script script_version script_parameters);
160 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161 $Job->{'started_at'} = gmtime;
163 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
167 $job_id = $Job->{'uuid'};
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
180 Log (undef, "check slurm allocation");
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
187 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188 push @sinfo, "$localcpus localhost";
190 if (exists $ENV{SLURM_NODELIST})
192 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
196 my ($ncpus, $slurm_nodelist) = split;
197 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
200 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
203 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
206 foreach (split (",", $ranges))
219 push @nodelist, map {
221 $n =~ s/\[[-,\d]+\]/$_/;
228 push @nodelist, $nodelist;
231 foreach my $nodename (@nodelist)
233 Log (undef, "node $nodename - $ncpus slots");
234 my $node = { name => $nodename,
238 foreach my $cpu (1..$ncpus)
240 push @slot, { node => $node,
244 push @node, @nodelist;
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259 # Claim this job, and make sure nobody else does
260 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262 croak("Error while updating / locking job");
264 $Job->update_attributes('started_at' => scalar gmtime,
267 'tasks_summary' => { 'failed' => 0,
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
306 if (defined $Job->{thawedfromkey})
308 thaw ($Job->{thawedfromkey});
312 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313 'job_uuid' => $Job->{'uuid'},
318 push @jobstep, { 'level' => 0,
320 'arvados_task' => $first_task,
322 push @jobstep_todo, 0;
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
334 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
337 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
338 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
339 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
341 or croak ("setup.py in $src_path failed: exit ".($?>>8));
349 $build_script = <DATA>;
351 Log (undef, "Install revision ".$Job->{script_version});
352 my $nodelist = join(",", @node);
354 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356 my $cleanpid = fork();
359 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
360 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
365 last if $cleanpid == waitpid (-1, WNOHANG);
366 freeze_if_want_freeze ($cleanpid);
367 select (undef, undef, undef, 0.1);
369 Log (undef, "Clean-work-dir exited $?");
371 # Install requested code version
374 my @srunargs = ("srun",
375 "--nodelist=$nodelist",
376 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
378 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
379 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383 my $treeish = $Job->{'script_version'};
384 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
385 # Todo: let script_version specify repository instead of expecting
386 # parent process to figure it out.
387 $ENV{"CRUNCH_SRC_URL"} = $repo;
389 # Create/update our clone of the remote git repo
391 if (!-d $ENV{"CRUNCH_SRC"}) {
392 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
393 or croak ("git clone $repo failed: exit ".($?>>8));
394 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
396 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
398 # If this looks like a subversion r#, look for it in git-svn commit messages
400 if ($treeish =~ m{^\d{1,4}$}) {
401 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
403 if ($gitlog =~ /^[a-f0-9]{40}$/) {
405 Log (undef, "Using commit $commit for script_version $treeish");
409 # If that didn't work, try asking git to look it up as a tree-ish.
411 if (!defined $commit) {
413 my $cooked_treeish = $treeish;
414 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
415 # Looks like a git branch name -- make sure git knows it's
416 # relative to the remote repo
417 $cooked_treeish = "origin/$treeish";
420 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
422 if ($found =~ /^[0-9a-f]{40}$/s) {
424 if ($commit ne $treeish) {
425 # Make sure we record the real commit id in the database,
426 # frozentokey, logs, etc. -- instead of an abbreviation or a
427 # branch name which can become ambiguous or point to a
428 # different commit in the future.
429 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430 Log (undef, "Using commit $commit for tree-ish $treeish");
431 if ($commit ne $treeish) {
432 $Job->{'script_version'} = $commit;
434 $Job->update_attributes('script_version' => $commit) or
435 croak("Error while updating job");
441 if (defined $commit) {
442 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
443 @execargs = ("sh", "-c",
444 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
445 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
448 croak ("could not figure out commit id for $treeish");
451 my $installpid = fork();
452 if ($installpid == 0)
454 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
459 last if $installpid == waitpid (-1, WNOHANG);
460 freeze_if_want_freeze ($installpid);
461 select (undef, undef, undef, 0.1);
463 Log (undef, "Install exited $?");
468 foreach (qw (script script_version script_parameters runtime_constraints))
472 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
474 foreach (split (/\n/, $Job->{knobs}))
476 Log (undef, "knob " . $_);
481 $main::success = undef;
487 my $thisround_succeeded = 0;
488 my $thisround_failed = 0;
489 my $thisround_failed_multiple = 0;
491 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
492 or $a <=> $b } @jobstep_todo;
493 my $level = $jobstep[$jobstep_todo[0]]->{level};
494 Log (undef, "start level $level");
499 my @freeslot = (0..$#slot);
502 my $progress_is_dirty = 1;
503 my $progress_stats_updated = 0;
505 update_progress_stats();
510 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
512 my $id = $jobstep_todo[$todo_ptr];
513 my $Jobstep = $jobstep[$id];
514 if ($Jobstep->{level} != $level)
519 pipe $reader{$id}, "writer" or croak ($!);
520 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
521 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
523 my $childslot = $freeslot[0];
524 my $childnode = $slot[$childslot]->{node};
525 my $childslotname = join (".",
526 $slot[$childslot]->{node}->{name},
527 $slot[$childslot]->{cpu});
528 my $childpid = fork();
531 $SIG{'INT'} = 'DEFAULT';
532 $SIG{'QUIT'} = 'DEFAULT';
533 $SIG{'TERM'} = 'DEFAULT';
535 foreach (values (%reader))
539 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
540 open(STDOUT,">&writer");
541 open(STDERR,">&writer");
546 delete $ENV{"GNUPGHOME"};
547 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
548 $ENV{"TASK_QSEQUENCE"} = $id;
549 $ENV{"TASK_SEQUENCE"} = $level;
550 $ENV{"JOB_SCRIPT"} = $Job->{script};
551 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
552 $param =~ tr/a-z/A-Z/;
553 $ENV{"JOB_PARAMETER_$param"} = $value;
555 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
556 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
557 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
558 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
559 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
560 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
561 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
567 "--nodelist=".$childnode->{name},
568 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
569 "--job-name=$job_id.$id.$$",
571 my @execargs = qw(sh);
572 my $build_script_to_send = "";
574 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
575 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
576 ."&& cd $ENV{CRUNCH_TMP} ";
579 $build_script_to_send = $build_script;
584 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
585 my @execargs = ('bash', '-c', $command);
586 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
590 if (!defined $childpid)
597 $proc{$childpid} = { jobstep => $id,
600 jobstepname => "$job_id.$id.$childpid",
602 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
603 $slot[$childslot]->{pid} = $childpid;
605 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
606 Log ($id, "child $childpid started on $childslotname");
607 $Jobstep->{starttime} = time;
608 $Jobstep->{node} = $childnode->{name};
609 $Jobstep->{slotindex} = $childslot;
610 delete $Jobstep->{stderr};
611 delete $Jobstep->{finishtime};
613 splice @jobstep_todo, $todo_ptr, 1;
616 $progress_is_dirty = 1;
620 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
622 last THISROUND if $main::please_freeze;
623 if ($main::please_info)
625 $main::please_info = 0;
629 update_progress_stats();
636 check_refresh_wanted();
638 update_progress_stats();
639 select (undef, undef, undef, 0.1);
641 elsif (time - $progress_stats_updated >= 30)
643 update_progress_stats();
645 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
646 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
648 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
649 .($thisround_failed+$thisround_succeeded)
650 .") -- giving up on this round";
651 Log (undef, $message);
655 # move slots from freeslot to holdslot (or back to freeslot) if necessary
656 for (my $i=$#freeslot; $i>=0; $i--) {
657 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
658 push @holdslot, (splice @freeslot, $i, 1);
661 for (my $i=$#holdslot; $i>=0; $i--) {
662 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
663 push @freeslot, (splice @holdslot, $i, 1);
667 # give up if no nodes are succeeding
668 if (!grep { $_->{node}->{losing_streak} == 0 &&
669 $_->{node}->{hold_count} < 4 } @slot) {
670 my $message = "Every node has failed -- giving up on this round";
671 Log (undef, $message);
678 push @freeslot, splice @holdslot;
679 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
682 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
685 if ($main::please_continue) {
686 $main::please_continue = 0;
689 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
693 check_refresh_wanted();
695 update_progress_stats();
696 select (undef, undef, undef, 0.1);
697 killem (keys %proc) if $main::please_freeze;
701 update_progress_stats();
702 freeze_if_want_freeze();
705 if (!defined $main::success)
708 $thisround_succeeded == 0 &&
709 ($thisround_failed == 0 || $thisround_failed > 4))
711 my $message = "stop because $thisround_failed tasks failed and none succeeded";
712 Log (undef, $message);
721 goto ONELEVEL if !defined $main::success;
724 release_allocation();
727 $Job->update_attributes('output' => &collate_output(),
729 'success' => $Job->{'output'} && $main::success,
730 'finished_at' => scalar gmtime)
733 if ($Job->{'output'})
736 my $manifest_text = capturex("whget", $Job->{'output'});
737 $arv->{'collections'}->{'create'}->execute('collection' => {
738 'uuid' => $Job->{'output'},
739 'manifest_text' => $manifest_text,
743 Log (undef, "Failed to register output manifest: $@");
747 Log (undef, "finish");
754 sub update_progress_stats
756 $progress_stats_updated = time;
757 return if !$progress_is_dirty;
758 my ($todo, $done, $running) = (scalar @jobstep_todo,
759 scalar @jobstep_done,
760 scalar @slot - scalar @freeslot - scalar @holdslot);
761 $Job->{'tasks_summary'} ||= {};
762 $Job->{'tasks_summary'}->{'todo'} = $todo;
763 $Job->{'tasks_summary'}->{'done'} = $done;
764 $Job->{'tasks_summary'}->{'running'} = $running;
766 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
768 Log (undef, "status: $done done, $running running, $todo todo");
769 $progress_is_dirty = 0;
776 my $pid = waitpid (-1, WNOHANG);
777 return 0 if $pid <= 0;
779 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
781 . $slot[$proc{$pid}->{slot}]->{cpu});
782 my $jobstepid = $proc{$pid}->{jobstep};
783 my $elapsed = time - $proc{$pid}->{time};
784 my $Jobstep = $jobstep[$jobstepid];
786 my $childstatus = $?;
787 my $exitvalue = $childstatus >> 8;
788 my $exitinfo = sprintf("exit %d signal %d%s",
791 ($childstatus & 128 ? ' core dump' : ''));
792 $Jobstep->{'arvados_task'}->reload;
793 my $task_success = $Jobstep->{'arvados_task'}->{success};
795 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
797 if (!defined $task_success) {
798 # task did not indicate one way or the other --> fail
799 $Jobstep->{'arvados_task'}->{success} = 0;
800 $Jobstep->{'arvados_task'}->save;
807 $temporary_fail ||= $Jobstep->{node_fail};
808 $temporary_fail ||= ($exitvalue == 111);
811 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
813 # Check for signs of a failed or misconfigured node
814 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
815 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
816 # Don't count this against jobstep failure thresholds if this
817 # node is already suspected faulty and srun exited quickly
818 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
820 Log ($jobstepid, "blaming failure on suspect node " .
821 $slot[$proc{$pid}->{slot}]->{node}->{name});
822 $temporary_fail ||= 1;
824 ban_node_by_slot($proc{$pid}->{slot});
827 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
828 ++$Jobstep->{'failures'},
829 $temporary_fail ? 'temporary ' : 'permanent',
832 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
833 # Give up on this task, and the whole job
835 $main::please_freeze = 1;
838 # Put this task back on the todo queue
839 push @jobstep_todo, $jobstepid;
841 $Job->{'tasks_summary'}->{'failed'}++;
845 ++$thisround_succeeded;
846 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
847 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
848 push @jobstep_done, $jobstepid;
849 Log ($jobstepid, "success in $elapsed seconds");
851 $Jobstep->{exitcode} = $childstatus;
852 $Jobstep->{finishtime} = time;
853 process_stderr ($jobstepid, $task_success);
854 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
856 close $reader{$jobstepid};
857 delete $reader{$jobstepid};
858 delete $slot[$proc{$pid}->{slot}]->{pid};
859 push @freeslot, $proc{$pid}->{slot};
863 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
865 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
867 'order' => 'qsequence'
869 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
871 'level' => $arvados_task->{'sequence'},
873 'arvados_task' => $arvados_task
875 push @jobstep, $jobstep;
876 push @jobstep_todo, $#jobstep;
879 $progress_is_dirty = 1;
883 sub check_refresh_wanted
885 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
886 if (@stat && $stat[9] > $latest_refresh) {
887 $latest_refresh = scalar time;
889 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
890 for my $attr ('cancelled_at',
891 'cancelled_by_user_uuid',
892 'cancelled_by_client_uuid') {
893 $Job->{$attr} = $Job2->{$attr};
895 if ($Job->{'cancelled_at'}) {
896 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
897 " by user " . $Job->{cancelled_by_user_uuid});
899 $main::please_freeze = 1;
907 # return if the kill list was checked <4 seconds ago
908 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
912 $squeue_kill_checked = time;
914 # use killem() on procs whose killtime is reached
917 if (exists $proc{$_}->{killtime}
918 && $proc{$_}->{killtime} <= time)
924 # return if the squeue was checked <60 seconds ago
925 if (defined $squeue_checked && $squeue_checked > time - 60)
929 $squeue_checked = time;
933 # here is an opportunity to check for mysterious problems with local procs
937 # get a list of steps still running
938 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
940 if ($squeue[-1] ne "ok")
946 # which of my jobsteps are running, according to squeue?
950 if (/^(\d+)\.(\d+) (\S+)/)
952 if ($1 eq $ENV{SLURM_JOBID})
959 # which of my active child procs (>60s old) were not mentioned by squeue?
962 if ($proc{$_}->{time} < time - 60
963 && !exists $ok{$proc{$_}->{jobstepname}}
964 && !exists $proc{$_}->{killtime})
966 # kill this proc if it hasn't exited in 30 seconds
967 $proc{$_}->{killtime} = time + 30;
973 sub release_allocation
977 Log (undef, "release job allocation");
978 system "scancel $ENV{SLURM_JOBID}";
986 foreach my $job (keys %reader)
989 while (0 < sysread ($reader{$job}, $buf, 8192))
991 print STDERR $buf if $ENV{CRUNCH_DEBUG};
992 $jobstep[$job]->{stderr} .= $buf;
993 preprocess_stderr ($job);
994 if (length ($jobstep[$job]->{stderr}) > 16384)
996 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1005 sub preprocess_stderr
1009 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1011 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1012 Log ($job, "stderr $line");
1013 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1015 $main::please_freeze = 1;
1017 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1018 $jobstep[$job]->{node_fail} = 1;
1019 ban_node_by_slot($jobstep[$job]->{slotindex});
1028 my $task_success = shift;
1029 preprocess_stderr ($job);
1032 Log ($job, "stderr $_");
1033 } split ("\n", $jobstep[$job]->{stderr});
1039 my $whc = Warehouse->new;
1040 Log (undef, "collate");
1041 $whc->write_start (1);
1045 next if (!exists $_->{'arvados_task'}->{output} ||
1046 !$_->{'arvados_task'}->{'success'} ||
1047 $_->{'exitcode'} != 0);
1048 my $output = $_->{'arvados_task'}->{output};
1049 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1051 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1052 $whc->write_data ($output);
1054 elsif (@jobstep == 1)
1056 $joboutput = $output;
1059 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1061 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1062 $whc->write_data ($outblock);
1066 my $errstr = $whc->errstr;
1067 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1071 $joboutput = $whc->write_finish if !defined $joboutput;
1074 Log (undef, "output $joboutput");
1075 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1079 Log (undef, "output undef");
1089 my $sig = 2; # SIGINT first
1090 if (exists $proc{$_}->{"sent_$sig"} &&
1091 time - $proc{$_}->{"sent_$sig"} > 4)
1093 $sig = 15; # SIGTERM if SIGINT doesn't work
1095 if (exists $proc{$_}->{"sent_$sig"} &&
1096 time - $proc{$_}->{"sent_$sig"} > 4)
1098 $sig = 9; # SIGKILL if SIGTERM doesn't work
1100 if (!exists $proc{$_}->{"sent_$sig"})
1102 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1104 select (undef, undef, undef, 0.1);
1107 kill $sig, $_; # srun wants two SIGINT to really interrupt
1109 $proc{$_}->{"sent_$sig"} = time;
1110 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1120 vec($bits,fileno($_),1) = 1;
1126 sub Log # ($jobstep_id, $logmessage)
1128 if ($_[1] =~ /\n/) {
1129 for my $line (split (/\n/, $_[1])) {
1134 my $fh = select STDERR; $|=1; select $fh;
1135 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1136 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1139 if ($metastream || -t STDERR) {
1140 my @gmtime = gmtime;
1141 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1142 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1144 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1146 return if !$metastream;
1147 $metastream->write_data ($datetime . " " . $message);
1153 my ($package, $file, $line) = caller;
1154 my $message = "@_ at $file line $line\n";
1155 Log (undef, $message);
1156 freeze() if @jobstep_todo;
1157 collate_output() if @jobstep_todo;
1159 save_meta() if $metastream;
1166 return if !$job_has_uuid;
1167 $Job->update_attributes('running' => 0,
1169 'finished_at' => scalar gmtime);
1175 my $justcheckpoint = shift; # false if this will be the last meta saved
1176 my $m = $metastream;
1177 $m = $m->copy if $justcheckpoint;
1179 my $whc = Warehouse->new;
1180 my $loglocator = $whc->store_block ($m->as_string);
1181 $arv->{'collections'}->{'create'}->execute('collection' => {
1182 'uuid' => $loglocator,
1183 'manifest_text' => $m->as_string,
1185 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1186 Log (undef, "log manifest is $loglocator");
1187 $Job->{'log'} = $loglocator;
1188 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1192 sub freeze_if_want_freeze
1194 if ($main::please_freeze)
1196 release_allocation();
1199 # kill some srun procs before freeze+stop
1200 map { $proc{$_} = {} } @_;
1203 killem (keys %proc);
1204 select (undef, undef, undef, 0.1);
1206 while (($died = waitpid (-1, WNOHANG)) > 0)
1208 delete $proc{$died};
1223 Log (undef, "Freeze not implemented");
1230 croak ("Thaw not implemented");
1234 Log (undef, "thaw from $key");
1239 @jobstep_tomerge = ();
1240 $jobstep_tomerge_level = 0;
1243 my $stream = new Warehouse::Stream ( whc => $whc,
1244 hash => [split (",", $key)] );
1246 while (my $dataref = $stream->read_until (undef, "\n\n"))
1248 if ($$dataref =~ /^job /)
1250 foreach (split ("\n", $$dataref))
1252 my ($k, $v) = split ("=", $_, 2);
1253 $frozenjob->{$k} = freezeunquote ($v);
1258 if ($$dataref =~ /^merge (\d+) (.*)/)
1260 $jobstep_tomerge_level = $1;
1262 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1267 foreach (split ("\n", $$dataref))
1269 my ($k, $v) = split ("=", $_, 2);
1270 $Jobstep->{$k} = freezeunquote ($v) if $k;
1272 $Jobstep->{'failures'} = 0;
1273 push @jobstep, $Jobstep;
1275 if ($Jobstep->{exitcode} eq "0")
1277 push @jobstep_done, $#jobstep;
1281 push @jobstep_todo, $#jobstep;
1285 foreach (qw (script script_version script_parameters))
1287 $Job->{$_} = $frozenjob->{$_};
1289 $Job->save if $job_has_uuid;
1305 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1312 my $srunargs = shift;
1313 my $execargs = shift;
1314 my $opts = shift || {};
1316 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1317 print STDERR (join (" ",
1318 map { / / ? "'$_'" : $_ }
1321 if $ENV{CRUNCH_DEBUG};
1323 if (defined $stdin) {
1324 my $child = open STDIN, "-|";
1325 defined $child or die "no fork: $!";
1327 print $stdin or die $!;
1328 close STDOUT or die $!;
1333 return system (@$args) if $opts->{fork};
1336 warn "ENV size is ".length(join(" ",%ENV));
1337 die "exec failed: $!: @$args";
1341 sub ban_node_by_slot {
1342 # Don't start any new jobsteps on this node for 60 seconds
1344 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1345 $slot[$slotid]->{node}->{hold_count}++;
1346 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1352 # checkout-and-build
1356 my $destdir = $ENV{"CRUNCH_SRC"};
1357 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1358 my $repo = $ENV{"CRUNCH_SRC_URL"};
1360 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1362 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1366 unlink "$destdir.commit";
1367 open STDOUT, ">", "$destdir.log";
1368 open STDERR, ">&STDOUT";
1371 my @git_archive_data = <DATA>;
1372 if (@git_archive_data) {
1373 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1374 print TARX @git_archive_data;
1376 die "'tar -C $destdir -xf -' exited $?: $!";
1381 chomp ($pwd = `pwd`);
1382 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1385 for my $src_path ("$destdir/arvados/sdk/python") {
1387 shell_or_die ("virtualenv", $install_dir);
1388 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1392 if (-e "$destdir/crunch_scripts/install") {
1393 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1394 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1396 shell_or_die ("./tests/autotests.sh", $install_dir);
1397 } elsif (-e "./install.sh") {
1398 shell_or_die ("./install.sh", $install_dir);
1402 unlink "$destdir.commit.new";
1403 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1404 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1413 if ($ENV{"DEBUG"}) {
1414 print STDERR "@_\n";
1417 or die "@_ failed: $! exit 0x".sprintf("%x",$?);