2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81 if ($ENV{"USER"} ne "crunch" && $< != 0) {
82 # use a tmp dir unique for my uid
83 $ENV{"CRUNCH_TMP"} .= "-$<";
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
96 GetOptions('force-unlock' => \$force_unlock,
97 'git-dir=s' => \$git_dir,
99 'job-api-token=s' => \$job_api_token,
100 'resume-stash=s' => \$resume_stash,
103 if (defined $job_api_token) {
104 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
114 $main::ENV{CRUNCH_DEBUG} = 1;
118 $main::ENV{CRUNCH_DEBUG} = 0;
123 my $arv = Arvados->new('apiVersion' => 'v1');
126 my $User = $arv->{'users'}->{'current'}->execute;
134 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135 if (!$force_unlock) {
136 if ($Job->{'is_locked_by_uuid'}) {
137 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139 if ($Job->{'success'} ne undef) {
140 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142 if ($Job->{'running'}) {
143 croak("Job 'running' flag is already set");
145 if ($Job->{'started_at'}) {
146 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
152 $Job = JSON::decode_json($jobspec);
156 map { croak ("No $_ specified") unless $Job->{$_} }
157 qw(script script_version script_parameters);
160 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161 $Job->{'started_at'} = gmtime;
163 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
167 $job_id = $Job->{'uuid'};
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
180 Log (undef, "check slurm allocation");
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
187 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188 push @sinfo, "$localcpus localhost";
190 if (exists $ENV{SLURM_NODELIST})
192 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
196 my ($ncpus, $slurm_nodelist) = split;
197 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
200 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
203 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
206 foreach (split (",", $ranges))
219 push @nodelist, map {
221 $n =~ s/\[[-,\d]+\]/$_/;
228 push @nodelist, $nodelist;
231 foreach my $nodename (@nodelist)
233 Log (undef, "node $nodename - $ncpus slots");
234 my $node = { name => $nodename,
238 foreach my $cpu (1..$ncpus)
240 push @slot, { node => $node,
244 push @node, @nodelist;
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259 # Claim this job, and make sure nobody else does
260 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262 croak("Error while updating / locking job");
264 $Job->update_attributes('started_at' => scalar gmtime,
267 'tasks_summary' => { 'failed' => 0,
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
306 if (defined $Job->{thawedfromkey})
308 thaw ($Job->{thawedfromkey});
312 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313 'job_uuid' => $Job->{'uuid'},
318 push @jobstep, { 'level' => 0,
320 'arvados_task' => $first_task,
322 push @jobstep_todo, 0;
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
334 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
336 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
337 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python",
338 "$ENV{CRUNCH_SRC}/sdk/python") {
340 system ("cd $src_path && \$CRUNCH_TMP/opt/bin/python setup.py install")
342 or croak ("setup.py in $src_path failed: exit ".($?>>8));
350 $build_script = <DATA>;
352 Log (undef, "Install revision ".$Job->{script_version});
353 my $nodelist = join(",", @node);
355 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
357 my $cleanpid = fork();
360 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
361 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
366 last if $cleanpid == waitpid (-1, WNOHANG);
367 freeze_if_want_freeze ($cleanpid);
368 select (undef, undef, undef, 0.1);
370 Log (undef, "Clean-work-dir exited $?");
372 # Install requested code version
375 my @srunargs = ("srun",
376 "--nodelist=$nodelist",
377 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
379 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
380 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
384 my $treeish = $Job->{'script_version'};
385 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
386 # Todo: let script_version specify repository instead of expecting
387 # parent process to figure it out.
388 $ENV{"CRUNCH_SRC_URL"} = $repo;
390 # Create/update our clone of the remote git repo
392 if (!-d $ENV{"CRUNCH_SRC"}) {
393 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
394 or croak ("git clone $repo failed: exit ".($?>>8));
395 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
397 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
399 # If this looks like a subversion r#, look for it in git-svn commit messages
401 if ($treeish =~ m{^\d{1,4}$}) {
402 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
404 if ($gitlog =~ /^[a-f0-9]{40}$/) {
406 Log (undef, "Using commit $commit for script_version $treeish");
410 # If that didn't work, try asking git to look it up as a tree-ish.
412 if (!defined $commit) {
414 my $cooked_treeish = $treeish;
415 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
416 # Looks like a git branch name -- make sure git knows it's
417 # relative to the remote repo
418 $cooked_treeish = "origin/$treeish";
421 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
423 if ($found =~ /^[0-9a-f]{40}$/s) {
425 if ($commit ne $treeish) {
426 # Make sure we record the real commit id in the database,
427 # frozentokey, logs, etc. -- instead of an abbreviation or a
428 # branch name which can become ambiguous or point to a
429 # different commit in the future.
430 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
431 Log (undef, "Using commit $commit for tree-ish $treeish");
432 if ($commit ne $treeish) {
433 $Job->{'script_version'} = $commit;
435 $Job->update_attributes('script_version' => $commit) or
436 croak("Error while updating job");
442 if (defined $commit) {
443 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
444 @execargs = ("sh", "-c",
445 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
446 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
449 croak ("could not figure out commit id for $treeish");
452 my $installpid = fork();
453 if ($installpid == 0)
455 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
460 last if $installpid == waitpid (-1, WNOHANG);
461 freeze_if_want_freeze ($installpid);
462 select (undef, undef, undef, 0.1);
464 Log (undef, "Install exited $?");
469 foreach (qw (script script_version script_parameters runtime_constraints))
473 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
475 foreach (split (/\n/, $Job->{knobs}))
477 Log (undef, "knob " . $_);
482 $main::success = undef;
488 my $thisround_succeeded = 0;
489 my $thisround_failed = 0;
490 my $thisround_failed_multiple = 0;
492 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
493 or $a <=> $b } @jobstep_todo;
494 my $level = $jobstep[$jobstep_todo[0]]->{level};
495 Log (undef, "start level $level");
500 my @freeslot = (0..$#slot);
503 my $progress_is_dirty = 1;
504 my $progress_stats_updated = 0;
506 update_progress_stats();
511 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
513 my $id = $jobstep_todo[$todo_ptr];
514 my $Jobstep = $jobstep[$id];
515 if ($Jobstep->{level} != $level)
520 pipe $reader{$id}, "writer" or croak ($!);
521 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
522 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
524 my $childslot = $freeslot[0];
525 my $childnode = $slot[$childslot]->{node};
526 my $childslotname = join (".",
527 $slot[$childslot]->{node}->{name},
528 $slot[$childslot]->{cpu});
529 my $childpid = fork();
532 $SIG{'INT'} = 'DEFAULT';
533 $SIG{'QUIT'} = 'DEFAULT';
534 $SIG{'TERM'} = 'DEFAULT';
536 foreach (values (%reader))
540 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
541 open(STDOUT,">&writer");
542 open(STDERR,">&writer");
547 delete $ENV{"GNUPGHOME"};
548 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
549 $ENV{"TASK_QSEQUENCE"} = $id;
550 $ENV{"TASK_SEQUENCE"} = $level;
551 $ENV{"JOB_SCRIPT"} = $Job->{script};
552 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
553 $param =~ tr/a-z/A-Z/;
554 $ENV{"JOB_PARAMETER_$param"} = $value;
556 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
557 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
558 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
559 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
560 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
561 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
562 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
568 "--nodelist=".$childnode->{name},
569 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
570 "--job-name=$job_id.$id.$$",
572 my @execargs = qw(sh);
573 my $build_script_to_send = "";
575 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
576 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
577 ."&& cd $ENV{CRUNCH_TMP} ";
580 $build_script_to_send = $build_script;
584 $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
585 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
586 $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
587 $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
589 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
590 my @execargs = ('bash', '-c', $command);
591 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
595 if (!defined $childpid)
602 $proc{$childpid} = { jobstep => $id,
605 jobstepname => "$job_id.$id.$childpid",
607 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
608 $slot[$childslot]->{pid} = $childpid;
610 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
611 Log ($id, "child $childpid started on $childslotname");
612 $Jobstep->{starttime} = time;
613 $Jobstep->{node} = $childnode->{name};
614 $Jobstep->{slotindex} = $childslot;
615 delete $Jobstep->{stderr};
616 delete $Jobstep->{finishtime};
618 splice @jobstep_todo, $todo_ptr, 1;
621 $progress_is_dirty = 1;
625 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
627 last THISROUND if $main::please_freeze;
628 if ($main::please_info)
630 $main::please_info = 0;
634 update_progress_stats();
641 check_refresh_wanted();
643 update_progress_stats();
644 select (undef, undef, undef, 0.1);
646 elsif (time - $progress_stats_updated >= 30)
648 update_progress_stats();
650 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
651 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
653 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
654 .($thisround_failed+$thisround_succeeded)
655 .") -- giving up on this round";
656 Log (undef, $message);
660 # move slots from freeslot to holdslot (or back to freeslot) if necessary
661 for (my $i=$#freeslot; $i>=0; $i--) {
662 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
663 push @holdslot, (splice @freeslot, $i, 1);
666 for (my $i=$#holdslot; $i>=0; $i--) {
667 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
668 push @freeslot, (splice @holdslot, $i, 1);
672 # give up if no nodes are succeeding
673 if (!grep { $_->{node}->{losing_streak} == 0 &&
674 $_->{node}->{hold_count} < 4 } @slot) {
675 my $message = "Every node has failed -- giving up on this round";
676 Log (undef, $message);
683 push @freeslot, splice @holdslot;
684 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
687 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
690 if ($main::please_continue) {
691 $main::please_continue = 0;
694 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
698 check_refresh_wanted();
700 update_progress_stats();
701 select (undef, undef, undef, 0.1);
702 killem (keys %proc) if $main::please_freeze;
706 update_progress_stats();
707 freeze_if_want_freeze();
710 if (!defined $main::success)
713 $thisround_succeeded == 0 &&
714 ($thisround_failed == 0 || $thisround_failed > 4))
716 my $message = "stop because $thisround_failed tasks failed and none succeeded";
717 Log (undef, $message);
726 goto ONELEVEL if !defined $main::success;
729 release_allocation();
732 $Job->update_attributes('output' => &collate_output(),
734 'success' => $Job->{'output'} && $main::success,
735 'finished_at' => scalar gmtime)
738 if ($Job->{'output'})
741 my $manifest_text = capturex("whget", $Job->{'output'});
742 $arv->{'collections'}->{'create'}->execute('collection' => {
743 'uuid' => $Job->{'output'},
744 'manifest_text' => $manifest_text,
748 Log (undef, "Failed to register output manifest: $@");
752 Log (undef, "finish");
759 sub update_progress_stats
761 $progress_stats_updated = time;
762 return if !$progress_is_dirty;
763 my ($todo, $done, $running) = (scalar @jobstep_todo,
764 scalar @jobstep_done,
765 scalar @slot - scalar @freeslot - scalar @holdslot);
766 $Job->{'tasks_summary'} ||= {};
767 $Job->{'tasks_summary'}->{'todo'} = $todo;
768 $Job->{'tasks_summary'}->{'done'} = $done;
769 $Job->{'tasks_summary'}->{'running'} = $running;
771 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
773 Log (undef, "status: $done done, $running running, $todo todo");
774 $progress_is_dirty = 0;
781 my $pid = waitpid (-1, WNOHANG);
782 return 0 if $pid <= 0;
784 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
786 . $slot[$proc{$pid}->{slot}]->{cpu});
787 my $jobstepid = $proc{$pid}->{jobstep};
788 my $elapsed = time - $proc{$pid}->{time};
789 my $Jobstep = $jobstep[$jobstepid];
791 my $childstatus = $?;
792 my $exitvalue = $childstatus >> 8;
793 my $exitinfo = sprintf("exit %d signal %d%s",
796 ($childstatus & 128 ? ' core dump' : ''));
797 $Jobstep->{'arvados_task'}->reload;
798 my $task_success = $Jobstep->{'arvados_task'}->{success};
800 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
802 if (!defined $task_success) {
803 # task did not indicate one way or the other --> fail
804 $Jobstep->{'arvados_task'}->{success} = 0;
805 $Jobstep->{'arvados_task'}->save;
812 $temporary_fail ||= $Jobstep->{node_fail};
813 $temporary_fail ||= ($exitvalue == 111);
816 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
818 # Check for signs of a failed or misconfigured node
819 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
820 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
821 # Don't count this against jobstep failure thresholds if this
822 # node is already suspected faulty and srun exited quickly
823 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
825 Log ($jobstepid, "blaming failure on suspect node " .
826 $slot[$proc{$pid}->{slot}]->{node}->{name});
827 $temporary_fail ||= 1;
829 ban_node_by_slot($proc{$pid}->{slot});
832 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
833 ++$Jobstep->{'failures'},
834 $temporary_fail ? 'temporary ' : 'permanent',
837 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
838 # Give up on this task, and the whole job
840 $main::please_freeze = 1;
843 # Put this task back on the todo queue
844 push @jobstep_todo, $jobstepid;
846 $Job->{'tasks_summary'}->{'failed'}++;
850 ++$thisround_succeeded;
851 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
852 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
853 push @jobstep_done, $jobstepid;
854 Log ($jobstepid, "success in $elapsed seconds");
856 $Jobstep->{exitcode} = $childstatus;
857 $Jobstep->{finishtime} = time;
858 process_stderr ($jobstepid, $task_success);
859 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
861 close $reader{$jobstepid};
862 delete $reader{$jobstepid};
863 delete $slot[$proc{$pid}->{slot}]->{pid};
864 push @freeslot, $proc{$pid}->{slot};
868 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
870 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
872 'order' => 'qsequence'
874 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
876 'level' => $arvados_task->{'sequence'},
878 'arvados_task' => $arvados_task
880 push @jobstep, $jobstep;
881 push @jobstep_todo, $#jobstep;
884 $progress_is_dirty = 1;
888 sub check_refresh_wanted
890 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
891 if (@stat && $stat[9] > $latest_refresh) {
892 $latest_refresh = scalar time;
894 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
895 for my $attr ('cancelled_at',
896 'cancelled_by_user_uuid',
897 'cancelled_by_client_uuid') {
898 $Job->{$attr} = $Job2->{$attr};
900 if ($Job->{'cancelled_at'}) {
901 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
902 " by user " . $Job->{cancelled_by_user_uuid});
904 $main::please_freeze = 1;
912 # return if the kill list was checked <4 seconds ago
913 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
917 $squeue_kill_checked = time;
919 # use killem() on procs whose killtime is reached
922 if (exists $proc{$_}->{killtime}
923 && $proc{$_}->{killtime} <= time)
929 # return if the squeue was checked <60 seconds ago
930 if (defined $squeue_checked && $squeue_checked > time - 60)
934 $squeue_checked = time;
938 # here is an opportunity to check for mysterious problems with local procs
942 # get a list of steps still running
943 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
945 if ($squeue[-1] ne "ok")
951 # which of my jobsteps are running, according to squeue?
955 if (/^(\d+)\.(\d+) (\S+)/)
957 if ($1 eq $ENV{SLURM_JOBID})
964 # which of my active child procs (>60s old) were not mentioned by squeue?
967 if ($proc{$_}->{time} < time - 60
968 && !exists $ok{$proc{$_}->{jobstepname}}
969 && !exists $proc{$_}->{killtime})
971 # kill this proc if it hasn't exited in 30 seconds
972 $proc{$_}->{killtime} = time + 30;
978 sub release_allocation
982 Log (undef, "release job allocation");
983 system "scancel $ENV{SLURM_JOBID}";
991 foreach my $job (keys %reader)
994 while (0 < sysread ($reader{$job}, $buf, 8192))
996 print STDERR $buf if $ENV{CRUNCH_DEBUG};
997 $jobstep[$job]->{stderr} .= $buf;
998 preprocess_stderr ($job);
999 if (length ($jobstep[$job]->{stderr}) > 16384)
1001 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1010 sub preprocess_stderr
1014 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1016 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1017 Log ($job, "stderr $line");
1018 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1020 $main::please_freeze = 1;
1022 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1023 $jobstep[$job]->{node_fail} = 1;
1024 ban_node_by_slot($jobstep[$job]->{slotindex});
1033 my $task_success = shift;
1034 preprocess_stderr ($job);
1037 Log ($job, "stderr $_");
1038 } split ("\n", $jobstep[$job]->{stderr});
1044 my $whc = Warehouse->new;
1045 Log (undef, "collate");
1046 $whc->write_start (1);
1050 next if (!exists $_->{'arvados_task'}->{output} ||
1051 !$_->{'arvados_task'}->{'success'} ||
1052 $_->{'exitcode'} != 0);
1053 my $output = $_->{'arvados_task'}->{output};
1054 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1056 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1057 $whc->write_data ($output);
1059 elsif (@jobstep == 1)
1061 $joboutput = $output;
1064 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1066 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1067 $whc->write_data ($outblock);
1071 my $errstr = $whc->errstr;
1072 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1076 $joboutput = $whc->write_finish if !defined $joboutput;
1079 Log (undef, "output $joboutput");
1080 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1084 Log (undef, "output undef");
1094 my $sig = 2; # SIGINT first
1095 if (exists $proc{$_}->{"sent_$sig"} &&
1096 time - $proc{$_}->{"sent_$sig"} > 4)
1098 $sig = 15; # SIGTERM if SIGINT doesn't work
1100 if (exists $proc{$_}->{"sent_$sig"} &&
1101 time - $proc{$_}->{"sent_$sig"} > 4)
1103 $sig = 9; # SIGKILL if SIGTERM doesn't work
1105 if (!exists $proc{$_}->{"sent_$sig"})
1107 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1109 select (undef, undef, undef, 0.1);
1112 kill $sig, $_; # srun wants two SIGINT to really interrupt
1114 $proc{$_}->{"sent_$sig"} = time;
1115 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1125 vec($bits,fileno($_),1) = 1;
1131 sub Log # ($jobstep_id, $logmessage)
1133 if ($_[1] =~ /\n/) {
1134 for my $line (split (/\n/, $_[1])) {
1139 my $fh = select STDERR; $|=1; select $fh;
1140 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1141 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1144 if ($metastream || -t STDERR) {
1145 my @gmtime = gmtime;
1146 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1147 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1149 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1151 return if !$metastream;
1152 $metastream->write_data ($datetime . " " . $message);
1158 my ($package, $file, $line) = caller;
1159 my $message = "@_ at $file line $line\n";
1160 Log (undef, $message);
1161 freeze() if @jobstep_todo;
1162 collate_output() if @jobstep_todo;
1164 save_meta() if $metastream;
1171 return if !$job_has_uuid;
1172 $Job->update_attributes('running' => 0,
1174 'finished_at' => scalar gmtime);
1180 my $justcheckpoint = shift; # false if this will be the last meta saved
1181 my $m = $metastream;
1182 $m = $m->copy if $justcheckpoint;
1184 my $whc = Warehouse->new;
1185 my $loglocator = $whc->store_block ($m->as_string);
1186 $arv->{'collections'}->{'create'}->execute('collection' => {
1187 'uuid' => $loglocator,
1188 'manifest_text' => $m->as_string,
1190 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1191 Log (undef, "log manifest is $loglocator");
1192 $Job->{'log'} = $loglocator;
1193 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1197 sub freeze_if_want_freeze
1199 if ($main::please_freeze)
1201 release_allocation();
1204 # kill some srun procs before freeze+stop
1205 map { $proc{$_} = {} } @_;
1208 killem (keys %proc);
1209 select (undef, undef, undef, 0.1);
1211 while (($died = waitpid (-1, WNOHANG)) > 0)
1213 delete $proc{$died};
1228 Log (undef, "Freeze not implemented");
1235 croak ("Thaw not implemented");
1239 Log (undef, "thaw from $key");
1244 @jobstep_tomerge = ();
1245 $jobstep_tomerge_level = 0;
1248 my $stream = new Warehouse::Stream ( whc => $whc,
1249 hash => [split (",", $key)] );
1251 while (my $dataref = $stream->read_until (undef, "\n\n"))
1253 if ($$dataref =~ /^job /)
1255 foreach (split ("\n", $$dataref))
1257 my ($k, $v) = split ("=", $_, 2);
1258 $frozenjob->{$k} = freezeunquote ($v);
1263 if ($$dataref =~ /^merge (\d+) (.*)/)
1265 $jobstep_tomerge_level = $1;
1267 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1272 foreach (split ("\n", $$dataref))
1274 my ($k, $v) = split ("=", $_, 2);
1275 $Jobstep->{$k} = freezeunquote ($v) if $k;
1277 $Jobstep->{'failures'} = 0;
1278 push @jobstep, $Jobstep;
1280 if ($Jobstep->{exitcode} eq "0")
1282 push @jobstep_done, $#jobstep;
1286 push @jobstep_todo, $#jobstep;
1290 foreach (qw (script script_version script_parameters))
1292 $Job->{$_} = $frozenjob->{$_};
1294 $Job->save if $job_has_uuid;
1310 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1317 my $srunargs = shift;
1318 my $execargs = shift;
1319 my $opts = shift || {};
1321 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1322 print STDERR (join (" ",
1323 map { / / ? "'$_'" : $_ }
1326 if $ENV{CRUNCH_DEBUG};
1328 if (defined $stdin) {
1329 my $child = open STDIN, "-|";
1330 defined $child or die "no fork: $!";
1332 print $stdin or die $!;
1333 close STDOUT or die $!;
1338 return system (@$args) if $opts->{fork};
1341 warn "ENV size is ".length(join(" ",%ENV));
1342 die "exec failed: $!: @$args";
1346 sub ban_node_by_slot {
1347 # Don't start any new jobsteps on this node for 60 seconds
1349 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1350 $slot[$slotid]->{node}->{hold_count}++;
1351 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1357 # checkout-and-build
1361 my $destdir = $ENV{"CRUNCH_SRC"};
1362 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1363 my $repo = $ENV{"CRUNCH_SRC_URL"};
1365 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1367 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1371 unlink "$destdir.commit";
1372 open STDOUT, ">", "$destdir.log";
1373 open STDERR, ">&STDOUT";
1376 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1379 die "'tar -C $destdir -xf -' exited $?: $!";
1383 chomp ($pwd = `pwd`);
1384 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1387 shell_or_die ("virtualenv", $install_dir);
1388 for my $src_path ("$destdir/arvados/sdk/python", "$destdir/sdk/python") {
1390 shell_or_die ("cd $src_path && $install_dir/bin/python setup.py install");
1394 if (-e "$destdir/crunch_scripts/install") {
1395 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1396 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1398 shell_or_die ("./tests/autotests.sh", $install_dir);
1399 } elsif (-e "./install.sh") {
1400 shell_or_die ("./install.sh", $install_dir);
1404 unlink "$destdir.commit.new";
1405 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1406 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1415 if ($ENV{"DEBUG"}) {
1416 print STDERR "@_\n";
1419 or die "@_ failed: $! exit 0x".sprintf("%x",$?);