2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 unless (defined $ENV{"CRUNCH_TMP"}) {
79 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
80 if ($ENV{"USER"} ne "crunch" && $< != 0) {
81 # use a tmp dir unique for my uid
82 $ENV{"CRUNCH_TMP"} .= "-$<";
85 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
86 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
95 GetOptions('force-unlock' => \$force_unlock,
96 'git-dir=s' => \$git_dir,
98 'job-api-token=s' => \$job_api_token,
99 'resume-stash=s' => \$resume_stash,
102 if (defined $job_api_token) {
103 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
113 $main::ENV{CRUNCH_DEBUG} = 1;
117 $main::ENV{CRUNCH_DEBUG} = 0;
122 my $arv = Arvados->new('apiVersion' => 'v1');
125 my $User = $arv->{'users'}->{'current'}->execute;
133 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
134 if (!$force_unlock) {
135 if ($Job->{'is_locked_by_uuid'}) {
136 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138 if ($Job->{'success'} ne undef) {
139 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141 if ($Job->{'running'}) {
142 croak("Job 'running' flag is already set");
144 if ($Job->{'started_at'}) {
145 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
151 $Job = JSON::decode_json($jobspec);
155 map { croak ("No $_ specified") unless $Job->{$_} }
156 qw(script script_version script_parameters);
159 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
160 $Job->{'started_at'} = gmtime;
162 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
166 $job_id = $Job->{'uuid'};
168 # $metastream = Warehouse::Stream->new(whc => new Warehouse);
169 # $metastream->clear;
170 # $metastream->name('.');
171 # $metastream->write_start($job_id . '.log.txt');
174 $Job->{'runtime_constraints'} ||= {};
175 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
176 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
179 Log (undef, "check slurm allocation");
182 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
186 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
187 push @sinfo, "$localcpus localhost";
189 if (exists $ENV{SLURM_NODELIST})
191 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
195 my ($ncpus, $slurm_nodelist) = split;
196 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
199 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
202 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
205 foreach (split (",", $ranges))
218 push @nodelist, map {
220 $n =~ s/\[[-,\d]+\]/$_/;
227 push @nodelist, $nodelist;
230 foreach my $nodename (@nodelist)
232 Log (undef, "node $nodename - $ncpus slots");
233 my $node = { name => $nodename,
237 foreach my $cpu (1..$ncpus)
239 push @slot, { node => $node,
243 push @node, @nodelist;
248 # Ensure that we get one jobstep running on each allocated node before
249 # we start overloading nodes with concurrent steps
251 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
258 # Claim this job, and make sure nobody else does
259 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
260 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
261 croak("Error while updating / locking job");
263 $Job->update_attributes('started_at' => scalar gmtime,
266 'tasks_summary' => { 'failed' => 0,
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
305 if (defined $Job->{thawedfromkey})
307 thaw ($Job->{thawedfromkey});
311 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312 'job_uuid' => $Job->{'uuid'},
317 push @jobstep, { 'level' => 0,
319 'arvados_task' => $first_task,
321 push @jobstep_todo, 0;
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
333 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
334 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
336 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
337 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
338 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
340 or croak ("setup.py in $src_path failed: exit ".($?>>8));
348 $build_script = <DATA>;
350 Log (undef, "Install revision ".$Job->{script_version});
351 my $nodelist = join(",", @node);
353 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355 my $cleanpid = fork();
358 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
359 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
364 last if $cleanpid == waitpid (-1, WNOHANG);
365 freeze_if_want_freeze ($cleanpid);
366 select (undef, undef, undef, 0.1);
368 Log (undef, "Clean-work-dir exited $?");
370 # Install requested code version
373 my @srunargs = ("srun",
374 "--nodelist=$nodelist",
375 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
377 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
378 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
382 my $treeish = $Job->{'script_version'};
383 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
384 # Todo: let script_version specify repository instead of expecting
385 # parent process to figure it out.
386 $ENV{"CRUNCH_SRC_URL"} = $repo;
388 # Create/update our clone of the remote git repo
390 if (!-d $ENV{"CRUNCH_SRC"}) {
391 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
392 or croak ("git clone $repo failed: exit ".($?>>8));
393 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
395 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
397 # If this looks like a subversion r#, look for it in git-svn commit messages
399 if ($treeish =~ m{^\d{1,4}$}) {
400 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
402 if ($gitlog =~ /^[a-f0-9]{40}$/) {
404 Log (undef, "Using commit $commit for script_version $treeish");
408 # If that didn't work, try asking git to look it up as a tree-ish.
410 if (!defined $commit) {
412 my $cooked_treeish = $treeish;
413 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
414 # Looks like a git branch name -- make sure git knows it's
415 # relative to the remote repo
416 $cooked_treeish = "origin/$treeish";
419 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
421 if ($found =~ /^[0-9a-f]{40}$/s) {
423 if ($commit ne $treeish) {
424 # Make sure we record the real commit id in the database,
425 # frozentokey, logs, etc. -- instead of an abbreviation or a
426 # branch name which can become ambiguous or point to a
427 # different commit in the future.
428 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
429 Log (undef, "Using commit $commit for tree-ish $treeish");
430 if ($commit ne $treeish) {
431 $Job->{'script_version'} = $commit;
433 $Job->update_attributes('script_version' => $commit) or
434 croak("Error while updating job");
440 if (defined $commit) {
441 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
442 @execargs = ("sh", "-c",
443 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
444 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
447 croak ("could not figure out commit id for $treeish");
450 my $installpid = fork();
451 if ($installpid == 0)
453 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
458 last if $installpid == waitpid (-1, WNOHANG);
459 freeze_if_want_freeze ($installpid);
460 select (undef, undef, undef, 0.1);
462 Log (undef, "Install exited $?");
467 foreach (qw (script script_version script_parameters runtime_constraints))
471 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
473 foreach (split (/\n/, $Job->{knobs}))
475 Log (undef, "knob " . $_);
480 $main::success = undef;
486 my $thisround_succeeded = 0;
487 my $thisround_failed = 0;
488 my $thisround_failed_multiple = 0;
490 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
491 or $a <=> $b } @jobstep_todo;
492 my $level = $jobstep[$jobstep_todo[0]]->{level};
493 Log (undef, "start level $level");
498 my @freeslot = (0..$#slot);
501 my $progress_is_dirty = 1;
502 my $progress_stats_updated = 0;
504 update_progress_stats();
509 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
511 my $id = $jobstep_todo[$todo_ptr];
512 my $Jobstep = $jobstep[$id];
513 if ($Jobstep->{level} != $level)
518 pipe $reader{$id}, "writer" or croak ($!);
519 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
520 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
522 my $childslot = $freeslot[0];
523 my $childnode = $slot[$childslot]->{node};
524 my $childslotname = join (".",
525 $slot[$childslot]->{node}->{name},
526 $slot[$childslot]->{cpu});
527 my $childpid = fork();
530 $SIG{'INT'} = 'DEFAULT';
531 $SIG{'QUIT'} = 'DEFAULT';
532 $SIG{'TERM'} = 'DEFAULT';
534 foreach (values (%reader))
538 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
539 open(STDOUT,">&writer");
540 open(STDERR,">&writer");
545 delete $ENV{"GNUPGHOME"};
546 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
547 $ENV{"TASK_QSEQUENCE"} = $id;
548 $ENV{"TASK_SEQUENCE"} = $level;
549 $ENV{"JOB_SCRIPT"} = $Job->{script};
550 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
551 $param =~ tr/a-z/A-Z/;
552 $ENV{"JOB_PARAMETER_$param"} = $value;
554 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
555 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
556 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
557 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
558 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
559 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
560 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
566 "--nodelist=".$childnode->{name},
567 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
568 "--job-name=$job_id.$id.$$",
570 my @execargs = qw(sh);
571 my $build_script_to_send = "";
573 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
574 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
575 ."&& cd $ENV{CRUNCH_TMP} ";
578 $build_script_to_send = $build_script;
582 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
583 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
584 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
585 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
587 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
588 my @execargs = ('bash', '-c', $command);
589 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
593 if (!defined $childpid)
600 $proc{$childpid} = { jobstep => $id,
603 jobstepname => "$job_id.$id.$childpid",
605 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
606 $slot[$childslot]->{pid} = $childpid;
608 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
609 Log ($id, "child $childpid started on $childslotname");
610 $Jobstep->{starttime} = time;
611 $Jobstep->{node} = $childnode->{name};
612 $Jobstep->{slotindex} = $childslot;
613 delete $Jobstep->{stderr};
614 delete $Jobstep->{finishtime};
616 splice @jobstep_todo, $todo_ptr, 1;
619 $progress_is_dirty = 1;
623 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
625 last THISROUND if $main::please_freeze;
626 if ($main::please_info)
628 $main::please_info = 0;
632 update_progress_stats();
639 check_refresh_wanted();
641 update_progress_stats();
642 select (undef, undef, undef, 0.1);
644 elsif (time - $progress_stats_updated >= 30)
646 update_progress_stats();
648 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
649 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
651 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
652 .($thisround_failed+$thisround_succeeded)
653 .") -- giving up on this round";
654 Log (undef, $message);
658 # move slots from freeslot to holdslot (or back to freeslot) if necessary
659 for (my $i=$#freeslot; $i>=0; $i--) {
660 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
661 push @holdslot, (splice @freeslot, $i, 1);
664 for (my $i=$#holdslot; $i>=0; $i--) {
665 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
666 push @freeslot, (splice @holdslot, $i, 1);
670 # give up if no nodes are succeeding
671 if (!grep { $_->{node}->{losing_streak} == 0 &&
672 $_->{node}->{hold_count} < 4 } @slot) {
673 my $message = "Every node has failed -- giving up on this round";
674 Log (undef, $message);
681 push @freeslot, splice @holdslot;
682 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
685 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
688 if ($main::please_continue) {
689 $main::please_continue = 0;
692 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
696 check_refresh_wanted();
698 update_progress_stats();
699 select (undef, undef, undef, 0.1);
700 killem (keys %proc) if $main::please_freeze;
704 update_progress_stats();
705 freeze_if_want_freeze();
708 if (!defined $main::success)
711 $thisround_succeeded == 0 &&
712 ($thisround_failed == 0 || $thisround_failed > 4))
714 my $message = "stop because $thisround_failed tasks failed and none succeeded";
715 Log (undef, $message);
724 goto ONELEVEL if !defined $main::success;
727 release_allocation();
730 $Job->update_attributes('output' => &collate_output(),
732 'success' => $Job->{'output'} && $main::success,
733 'finished_at' => scalar gmtime)
736 if ($Job->{'output'})
739 my $manifest_text = `arv keep get $Job->{'output'}`;
740 $arv->{'collections'}->{'create'}->execute('collection' => {
741 'uuid' => $Job->{'output'},
742 'manifest_text' => $manifest_text,
746 Log (undef, "Failed to register output manifest: $@");
750 Log (undef, "finish");
757 sub update_progress_stats
759 $progress_stats_updated = time;
760 return if !$progress_is_dirty;
761 my ($todo, $done, $running) = (scalar @jobstep_todo,
762 scalar @jobstep_done,
763 scalar @slot - scalar @freeslot - scalar @holdslot);
764 $Job->{'tasks_summary'} ||= {};
765 $Job->{'tasks_summary'}->{'todo'} = $todo;
766 $Job->{'tasks_summary'}->{'done'} = $done;
767 $Job->{'tasks_summary'}->{'running'} = $running;
769 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
771 Log (undef, "status: $done done, $running running, $todo todo");
772 $progress_is_dirty = 0;
779 my $pid = waitpid (-1, WNOHANG);
780 return 0 if $pid <= 0;
782 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
784 . $slot[$proc{$pid}->{slot}]->{cpu});
785 my $jobstepid = $proc{$pid}->{jobstep};
786 my $elapsed = time - $proc{$pid}->{time};
787 my $Jobstep = $jobstep[$jobstepid];
789 my $childstatus = $?;
790 my $exitvalue = $childstatus >> 8;
791 my $exitinfo = sprintf("exit %d signal %d%s",
794 ($childstatus & 128 ? ' core dump' : ''));
795 $Jobstep->{'arvados_task'}->reload;
796 my $task_success = $Jobstep->{'arvados_task'}->{success};
798 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
800 if (!defined $task_success) {
801 # task did not indicate one way or the other --> fail
802 $Jobstep->{'arvados_task'}->{success} = 0;
803 $Jobstep->{'arvados_task'}->save;
810 $temporary_fail ||= $Jobstep->{node_fail};
811 $temporary_fail ||= ($exitvalue == 111);
814 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
816 # Check for signs of a failed or misconfigured node
817 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
818 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
819 # Don't count this against jobstep failure thresholds if this
820 # node is already suspected faulty and srun exited quickly
821 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
823 Log ($jobstepid, "blaming failure on suspect node " .
824 $slot[$proc{$pid}->{slot}]->{node}->{name});
825 $temporary_fail ||= 1;
827 ban_node_by_slot($proc{$pid}->{slot});
830 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
831 ++$Jobstep->{'failures'},
832 $temporary_fail ? 'temporary ' : 'permanent',
835 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
836 # Give up on this task, and the whole job
838 $main::please_freeze = 1;
841 # Put this task back on the todo queue
842 push @jobstep_todo, $jobstepid;
844 $Job->{'tasks_summary'}->{'failed'}++;
848 ++$thisround_succeeded;
849 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
850 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
851 push @jobstep_done, $jobstepid;
852 Log ($jobstepid, "success in $elapsed seconds");
854 $Jobstep->{exitcode} = $childstatus;
855 $Jobstep->{finishtime} = time;
856 process_stderr ($jobstepid, $task_success);
857 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
859 close $reader{$jobstepid};
860 delete $reader{$jobstepid};
861 delete $slot[$proc{$pid}->{slot}]->{pid};
862 push @freeslot, $proc{$pid}->{slot};
866 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
868 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
870 'order' => 'qsequence'
872 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
874 'level' => $arvados_task->{'sequence'},
876 'arvados_task' => $arvados_task
878 push @jobstep, $jobstep;
879 push @jobstep_todo, $#jobstep;
882 $progress_is_dirty = 1;
886 sub check_refresh_wanted
888 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
889 if (@stat && $stat[9] > $latest_refresh) {
890 $latest_refresh = scalar time;
892 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
893 for my $attr ('cancelled_at',
894 'cancelled_by_user_uuid',
895 'cancelled_by_client_uuid') {
896 $Job->{$attr} = $Job2->{$attr};
898 if ($Job->{'cancelled_at'}) {
899 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
900 " by user " . $Job->{cancelled_by_user_uuid});
902 $main::please_freeze = 1;
910 # return if the kill list was checked <4 seconds ago
911 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
915 $squeue_kill_checked = time;
917 # use killem() on procs whose killtime is reached
920 if (exists $proc{$_}->{killtime}
921 && $proc{$_}->{killtime} <= time)
927 # return if the squeue was checked <60 seconds ago
928 if (defined $squeue_checked && $squeue_checked > time - 60)
932 $squeue_checked = time;
936 # here is an opportunity to check for mysterious problems with local procs
940 # get a list of steps still running
941 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
943 if ($squeue[-1] ne "ok")
949 # which of my jobsteps are running, according to squeue?
953 if (/^(\d+)\.(\d+) (\S+)/)
955 if ($1 eq $ENV{SLURM_JOBID})
962 # which of my active child procs (>60s old) were not mentioned by squeue?
965 if ($proc{$_}->{time} < time - 60
966 && !exists $ok{$proc{$_}->{jobstepname}}
967 && !exists $proc{$_}->{killtime})
969 # kill this proc if it hasn't exited in 30 seconds
970 $proc{$_}->{killtime} = time + 30;
976 sub release_allocation
980 Log (undef, "release job allocation");
981 system "scancel $ENV{SLURM_JOBID}";
989 foreach my $job (keys %reader)
992 while (0 < sysread ($reader{$job}, $buf, 8192))
994 print STDERR $buf if $ENV{CRUNCH_DEBUG};
995 $jobstep[$job]->{stderr} .= $buf;
996 preprocess_stderr ($job);
997 if (length ($jobstep[$job]->{stderr}) > 16384)
999 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1008 sub preprocess_stderr
1012 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1014 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1015 Log ($job, "stderr $line");
1016 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1018 $main::please_freeze = 1;
1020 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1021 $jobstep[$job]->{node_fail} = 1;
1022 ban_node_by_slot($jobstep[$job]->{slotindex});
1031 my $task_success = shift;
1032 preprocess_stderr ($job);
1035 Log ($job, "stderr $_");
1036 } split ("\n", $jobstep[$job]->{stderr});
1042 my ($child_out, $child_in, $output_block);
1044 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
1045 sysread($child_out, $output_block, 64 * 1024 * 1024);
1047 return $output_block;
1052 Log (undef, "collate");
1054 my ($child_out, $child_in);
1055 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1059 next if (!exists $_->{'arvados_task'}->{output} ||
1060 !$_->{'arvados_task'}->{'success'} ||
1061 $_->{'exitcode'} != 0);
1062 my $output = $_->{'arvados_task'}->{output};
1063 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1065 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1066 print $child_in $output;
1068 elsif (@jobstep == 1)
1070 $joboutput = $output;
1073 elsif (defined (my $outblock = fetch_block ($output)))
1075 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1076 print $child_in $outblock;
1080 print $child_in "XXX fetch_block($output) failed XXX\n";
1084 if (!defined $joboutput) {
1085 my $s = IO::Select->new($child_out);
1086 sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(0);
1093 Log (undef, "output $joboutput");
1094 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1098 Log (undef, "output undef");
1108 my $sig = 2; # SIGINT first
1109 if (exists $proc{$_}->{"sent_$sig"} &&
1110 time - $proc{$_}->{"sent_$sig"} > 4)
1112 $sig = 15; # SIGTERM if SIGINT doesn't work
1114 if (exists $proc{$_}->{"sent_$sig"} &&
1115 time - $proc{$_}->{"sent_$sig"} > 4)
1117 $sig = 9; # SIGKILL if SIGTERM doesn't work
1119 if (!exists $proc{$_}->{"sent_$sig"})
1121 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1123 select (undef, undef, undef, 0.1);
1126 kill $sig, $_; # srun wants two SIGINT to really interrupt
1128 $proc{$_}->{"sent_$sig"} = time;
1129 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1139 vec($bits,fileno($_),1) = 1;
1145 sub Log # ($jobstep_id, $logmessage)
1147 if ($_[1] =~ /\n/) {
1148 for my $line (split (/\n/, $_[1])) {
1153 my $fh = select STDERR; $|=1; select $fh;
1154 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1155 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1158 if ($metastream || -t STDERR) {
1159 my @gmtime = gmtime;
1160 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1161 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1163 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1165 # return if !$metastream;
1166 # $metastream->write_data ($datetime . " " . $message);
1172 my ($package, $file, $line) = caller;
1173 my $message = "@_ at $file line $line\n";
1174 Log (undef, $message);
1175 freeze() if @jobstep_todo;
1176 collate_output() if @jobstep_todo;
1178 save_meta() if $metastream;
1185 return if !$job_has_uuid;
1186 $Job->update_attributes('running' => 0,
1188 'finished_at' => scalar gmtime);
1194 # my $justcheckpoint = shift; # false if this will be the last meta saved
1195 # my $m = $metastream;
1196 # $m = $m->copy if $justcheckpoint;
1198 # my $whc = Warehouse->new;
1199 # my $loglocator = $whc->store_block ($m->as_string);
1200 # $arv->{'collections'}->{'create'}->execute('collection' => {
1201 # 'uuid' => $loglocator,
1202 # 'manifest_text' => $m->as_string,
1204 # undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1205 # Log (undef, "log manifest is $loglocator");
1206 # $Job->{'log'} = $loglocator;
1207 # $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1211 sub freeze_if_want_freeze
1213 if ($main::please_freeze)
1215 release_allocation();
1218 # kill some srun procs before freeze+stop
1219 map { $proc{$_} = {} } @_;
1222 killem (keys %proc);
1223 select (undef, undef, undef, 0.1);
1225 while (($died = waitpid (-1, WNOHANG)) > 0)
1227 delete $proc{$died};
1242 Log (undef, "Freeze not implemented");
1249 croak ("Thaw not implemented");
1253 # Log (undef, "thaw from $key");
1256 # @jobstep_done = ();
1257 # @jobstep_todo = ();
1258 # @jobstep_tomerge = ();
1259 # $jobstep_tomerge_level = 0;
1260 # my $frozenjob = {};
1262 # my $stream = new Warehouse::Stream ( whc => $whc,
1263 # hash => [split (",", $key)] );
1265 # while (my $dataref = $stream->read_until (undef, "\n\n"))
1267 # if ($$dataref =~ /^job /)
1269 # foreach (split ("\n", $$dataref))
1271 # my ($k, $v) = split ("=", $_, 2);
1272 # $frozenjob->{$k} = freezeunquote ($v);
1277 # if ($$dataref =~ /^merge (\d+) (.*)/)
1279 # $jobstep_tomerge_level = $1;
1281 # = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1285 # my $Jobstep = { };
1286 # foreach (split ("\n", $$dataref))
1288 # my ($k, $v) = split ("=", $_, 2);
1289 # $Jobstep->{$k} = freezeunquote ($v) if $k;
1291 # $Jobstep->{'failures'} = 0;
1292 # push @jobstep, $Jobstep;
1294 # if ($Jobstep->{exitcode} eq "0")
1296 # push @jobstep_done, $#jobstep;
1300 # push @jobstep_todo, $#jobstep;
1304 # foreach (qw (script script_version script_parameters))
1306 # $Job->{$_} = $frozenjob->{$_};
1308 # $Job->save if $job_has_uuid;
1324 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1331 my $srunargs = shift;
1332 my $execargs = shift;
1333 my $opts = shift || {};
1335 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1336 print STDERR (join (" ",
1337 map { / / ? "'$_'" : $_ }
1340 if $ENV{CRUNCH_DEBUG};
1342 if (defined $stdin) {
1343 my $child = open STDIN, "-|";
1344 defined $child or die "no fork: $!";
1346 print $stdin or die $!;
1347 close STDOUT or die $!;
1352 return system (@$args) if $opts->{fork};
1355 warn "ENV size is ".length(join(" ",%ENV));
1356 die "exec failed: $!: @$args";
1360 sub ban_node_by_slot {
1361 # Don't start any new jobsteps on this node for 60 seconds
1363 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1364 $slot[$slotid]->{node}->{hold_count}++;
1365 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1371 # checkout-and-build
1375 my $destdir = $ENV{"CRUNCH_SRC"};
1376 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1377 my $repo = $ENV{"CRUNCH_SRC_URL"};
1379 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1381 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1385 unlink "$destdir.commit";
1386 open STDOUT, ">", "$destdir.log";
1387 open STDERR, ">&STDOUT";
1390 my @git_archive_data = <DATA>;
1391 if (@git_archive_data) {
1392 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1393 print TARX @git_archive_data;
1395 die "'tar -C $destdir -xf -' exited $?: $!";
1400 chomp ($pwd = `pwd`);
1401 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1404 for my $src_path ("$destdir/arvados/sdk/python") {
1406 shell_or_die ("virtualenv", $install_dir);
1407 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1411 if (-e "$destdir/crunch_scripts/install") {
1412 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1413 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1415 shell_or_die ("./tests/autotests.sh", $install_dir);
1416 } elsif (-e "./install.sh") {
1417 shell_or_die ("./install.sh", $install_dir);
1421 unlink "$destdir.commit.new";
1422 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1423 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1432 if ($ENV{"DEBUG"}) {
1433 print STDERR "@_\n";
1436 or die "@_ failed: $! exit 0x".sprintf("%x",$?);