2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
104 GetOptions('force-unlock' => \$force_unlock,
105 'git-dir=s' => \$git_dir,
106 'job=s' => \$jobspec,
107 'job-api-token=s' => \$job_api_token,
108 'no-clear-tmp' => \$no_clear_tmp,
109 'resume-stash=s' => \$resume_stash,
112 if (defined $job_api_token) {
113 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
123 $main::ENV{CRUNCH_DEBUG} = 1;
127 $main::ENV{CRUNCH_DEBUG} = 0;
132 my $arv = Arvados->new('apiVersion' => 'v1');
135 my $User = $arv->{'users'}->{'current'}->execute;
143 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144 if (!$force_unlock) {
145 if ($Job->{'is_locked_by_uuid'}) {
146 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148 if ($Job->{'success'} ne undef) {
149 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151 if ($Job->{'running'}) {
152 croak("Job 'running' flag is already set");
154 if ($Job->{'started_at'}) {
155 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
161 $Job = JSON::decode_json($jobspec);
165 map { croak ("No $_ specified") unless $Job->{$_} }
166 qw(script script_version script_parameters);
169 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170 $Job->{'started_at'} = gmtime;
172 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
176 $job_id = $Job->{'uuid'};
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
186 Log (undef, "check slurm allocation");
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
193 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194 push @sinfo, "$localcpus localhost";
196 if (exists $ENV{SLURM_NODELIST})
198 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
202 my ($ncpus, $slurm_nodelist) = split;
203 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
206 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
209 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
212 foreach (split (",", $ranges))
225 push @nodelist, map {
227 $n =~ s/\[[-,\d]+\]/$_/;
234 push @nodelist, $nodelist;
237 foreach my $nodename (@nodelist)
239 Log (undef, "node $nodename - $ncpus slots");
240 my $node = { name => $nodename,
244 foreach my $cpu (1..$ncpus)
246 push @slot, { node => $node,
250 push @node, @nodelist;
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
265 # Claim this job, and make sure nobody else does
266 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268 croak("Error while updating / locking job");
270 $Job->update_attributes('started_at' => scalar gmtime,
273 'tasks_summary' => { 'failed' => 0,
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
312 if (defined $Job->{thawedfromkey})
314 thaw ($Job->{thawedfromkey});
318 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319 'job_uuid' => $Job->{'uuid'},
324 push @jobstep, { 'level' => 0,
326 'arvados_task' => $first_task,
328 push @jobstep_todo, 0;
334 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
346 if (!defined $no_clear_tmp) {
347 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348 system($clear_tmp_cmd) == 0
349 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
351 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
354 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
358 or croak ("setup.py in $src_path failed: exit ".($?>>8));
366 $build_script = <DATA>;
368 Log (undef, "Install revision ".$Job->{script_version});
369 my $nodelist = join(",", @node);
371 if (!defined $no_clear_tmp) {
372 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
374 my $cleanpid = fork();
377 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
383 last if $cleanpid == waitpid (-1, WNOHANG);
384 freeze_if_want_freeze ($cleanpid);
385 select (undef, undef, undef, 0.1);
387 Log (undef, "Clean-work-dir exited $?");
390 # Install requested code version
393 my @srunargs = ("srun",
394 "--nodelist=$nodelist",
395 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
397 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
402 my $treeish = $Job->{'script_version'};
403 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
404 # Todo: let script_version specify repository instead of expecting
405 # parent process to figure it out.
406 $ENV{"CRUNCH_SRC_URL"} = $repo;
408 if (-d "$repo/.git") {
409 # We were given a working directory, but we are only interested in
411 $repo = "$repo/.git";
414 # If this looks like a subversion r#, look for it in git-svn commit messages
416 if ($treeish =~ m{^\d{1,4}$}) {
417 my $gitlog = `git --git-dir="$repo" log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " master`;
419 if ($gitlog =~ /^[a-f0-9]{40}$/) {
421 Log (undef, "Using commit $commit for script_version $treeish");
425 # If that didn't work, try asking git to look it up as a tree-ish.
427 if (!defined $commit) {
428 my $found = `git --git-dir="$repo" rev-list -1 "$treeish"`;
430 if ($found =~ /^[0-9a-f]{40}$/s) {
432 if ($commit ne $treeish) {
433 # Make sure we record the real commit id in the database,
434 # frozentokey, logs, etc. -- instead of an abbreviation or a
435 # branch name which can become ambiguous or point to a
436 # different commit in the future.
437 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
438 Log (undef, "Using commit $commit for tree-ish $treeish");
439 if ($commit ne $treeish) {
440 $Job->{'script_version'} = $commit;
442 $Job->update_attributes('script_version' => $commit) or
443 croak("Error while updating job");
449 if (defined $commit) {
450 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451 @execargs = ("sh", "-c",
452 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
453 $git_archive = `git --git-dir="$repo" archive "$commit"`;
456 croak ("could not figure out commit id for $treeish");
459 my $installpid = fork();
460 if ($installpid == 0)
462 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
467 last if $installpid == waitpid (-1, WNOHANG);
468 freeze_if_want_freeze ($installpid);
469 select (undef, undef, undef, 0.1);
471 Log (undef, "Install exited $?");
476 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
477 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
482 foreach (qw (script script_version script_parameters runtime_constraints))
486 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
488 foreach (split (/\n/, $Job->{knobs}))
490 Log (undef, "knob " . $_);
495 $main::success = undef;
501 my $thisround_succeeded = 0;
502 my $thisround_failed = 0;
503 my $thisround_failed_multiple = 0;
505 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
506 or $a <=> $b } @jobstep_todo;
507 my $level = $jobstep[$jobstep_todo[0]]->{level};
508 Log (undef, "start level $level");
513 my @freeslot = (0..$#slot);
516 my $progress_is_dirty = 1;
517 my $progress_stats_updated = 0;
519 update_progress_stats();
524 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
526 my $id = $jobstep_todo[$todo_ptr];
527 my $Jobstep = $jobstep[$id];
528 if ($Jobstep->{level} != $level)
533 pipe $reader{$id}, "writer" or croak ($!);
534 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
535 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
537 my $childslot = $freeslot[0];
538 my $childnode = $slot[$childslot]->{node};
539 my $childslotname = join (".",
540 $slot[$childslot]->{node}->{name},
541 $slot[$childslot]->{cpu});
542 my $childpid = fork();
545 $SIG{'INT'} = 'DEFAULT';
546 $SIG{'QUIT'} = 'DEFAULT';
547 $SIG{'TERM'} = 'DEFAULT';
549 foreach (values (%reader))
553 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
554 open(STDOUT,">&writer");
555 open(STDERR,">&writer");
560 delete $ENV{"GNUPGHOME"};
561 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
562 $ENV{"TASK_QSEQUENCE"} = $id;
563 $ENV{"TASK_SEQUENCE"} = $level;
564 $ENV{"JOB_SCRIPT"} = $Job->{script};
565 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
566 $param =~ tr/a-z/A-Z/;
567 $ENV{"JOB_PARAMETER_$param"} = $value;
569 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
570 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
571 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
572 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
573 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
574 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
575 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
581 "--nodelist=".$childnode->{name},
582 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
583 "--job-name=$job_id.$id.$$",
585 my @execargs = qw(sh);
586 my $build_script_to_send = "";
588 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
589 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
590 ."&& cd $ENV{CRUNCH_TMP} ";
593 $build_script_to_send = $build_script;
598 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
599 my @execargs = ('bash', '-c', $command);
600 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
604 if (!defined $childpid)
611 $proc{$childpid} = { jobstep => $id,
614 jobstepname => "$job_id.$id.$childpid",
616 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
617 $slot[$childslot]->{pid} = $childpid;
619 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
620 Log ($id, "child $childpid started on $childslotname");
621 $Jobstep->{starttime} = time;
622 $Jobstep->{node} = $childnode->{name};
623 $Jobstep->{slotindex} = $childslot;
624 delete $Jobstep->{stderr};
625 delete $Jobstep->{finishtime};
627 splice @jobstep_todo, $todo_ptr, 1;
630 $progress_is_dirty = 1;
634 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
636 last THISROUND if $main::please_freeze;
637 if ($main::please_info)
639 $main::please_info = 0;
643 update_progress_stats();
650 check_refresh_wanted();
652 update_progress_stats();
653 select (undef, undef, undef, 0.1);
655 elsif (time - $progress_stats_updated >= 30)
657 update_progress_stats();
659 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
660 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
662 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
663 .($thisround_failed+$thisround_succeeded)
664 .") -- giving up on this round";
665 Log (undef, $message);
669 # move slots from freeslot to holdslot (or back to freeslot) if necessary
670 for (my $i=$#freeslot; $i>=0; $i--) {
671 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
672 push @holdslot, (splice @freeslot, $i, 1);
675 for (my $i=$#holdslot; $i>=0; $i--) {
676 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
677 push @freeslot, (splice @holdslot, $i, 1);
681 # give up if no nodes are succeeding
682 if (!grep { $_->{node}->{losing_streak} == 0 &&
683 $_->{node}->{hold_count} < 4 } @slot) {
684 my $message = "Every node has failed -- giving up on this round";
685 Log (undef, $message);
692 push @freeslot, splice @holdslot;
693 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
696 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
699 if ($main::please_continue) {
700 $main::please_continue = 0;
703 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
707 check_refresh_wanted();
709 update_progress_stats();
710 select (undef, undef, undef, 0.1);
711 killem (keys %proc) if $main::please_freeze;
715 update_progress_stats();
716 freeze_if_want_freeze();
719 if (!defined $main::success)
722 $thisround_succeeded == 0 &&
723 ($thisround_failed == 0 || $thisround_failed > 4))
725 my $message = "stop because $thisround_failed tasks failed and none succeeded";
726 Log (undef, $message);
735 goto ONELEVEL if !defined $main::success;
738 release_allocation();
741 $Job->update_attributes('output' => &collate_output(),
743 'success' => $Job->{'output'} && $main::success,
744 'finished_at' => scalar gmtime)
747 if ($Job->{'output'})
750 my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
751 $arv->{'collections'}->{'create'}->execute('collection' => {
752 'uuid' => $Job->{'output'},
753 'manifest_text' => $manifest_text,
755 if ($Job->{'output_is_persistent'}) {
756 $arv->{'links'}->{'create'}->execute('link' => {
757 'tail_kind' => 'arvados#user',
758 'tail_uuid' => $User->{'uuid'},
759 'head_kind' => 'arvados#collection',
760 'head_uuid' => $Job->{'output'},
761 'link_class' => 'resources',
767 Log (undef, "Failed to register output manifest: $@");
771 Log (undef, "finish");
778 sub update_progress_stats
780 $progress_stats_updated = time;
781 return if !$progress_is_dirty;
782 my ($todo, $done, $running) = (scalar @jobstep_todo,
783 scalar @jobstep_done,
784 scalar @slot - scalar @freeslot - scalar @holdslot);
785 $Job->{'tasks_summary'} ||= {};
786 $Job->{'tasks_summary'}->{'todo'} = $todo;
787 $Job->{'tasks_summary'}->{'done'} = $done;
788 $Job->{'tasks_summary'}->{'running'} = $running;
790 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
792 Log (undef, "status: $done done, $running running, $todo todo");
793 $progress_is_dirty = 0;
800 my $pid = waitpid (-1, WNOHANG);
801 return 0 if $pid <= 0;
803 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
805 . $slot[$proc{$pid}->{slot}]->{cpu});
806 my $jobstepid = $proc{$pid}->{jobstep};
807 my $elapsed = time - $proc{$pid}->{time};
808 my $Jobstep = $jobstep[$jobstepid];
810 my $childstatus = $?;
811 my $exitvalue = $childstatus >> 8;
812 my $exitinfo = sprintf("exit %d signal %d%s",
815 ($childstatus & 128 ? ' core dump' : ''));
816 $Jobstep->{'arvados_task'}->reload;
817 my $task_success = $Jobstep->{'arvados_task'}->{success};
819 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
821 if (!defined $task_success) {
822 # task did not indicate one way or the other --> fail
823 $Jobstep->{'arvados_task'}->{success} = 0;
824 $Jobstep->{'arvados_task'}->save;
831 $temporary_fail ||= $Jobstep->{node_fail};
832 $temporary_fail ||= ($exitvalue == 111);
835 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
837 # Check for signs of a failed or misconfigured node
838 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
839 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
840 # Don't count this against jobstep failure thresholds if this
841 # node is already suspected faulty and srun exited quickly
842 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
844 Log ($jobstepid, "blaming failure on suspect node " .
845 $slot[$proc{$pid}->{slot}]->{node}->{name});
846 $temporary_fail ||= 1;
848 ban_node_by_slot($proc{$pid}->{slot});
851 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
852 ++$Jobstep->{'failures'},
853 $temporary_fail ? 'temporary ' : 'permanent',
856 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
857 # Give up on this task, and the whole job
859 $main::please_freeze = 1;
862 # Put this task back on the todo queue
863 push @jobstep_todo, $jobstepid;
865 $Job->{'tasks_summary'}->{'failed'}++;
869 ++$thisround_succeeded;
870 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
871 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
872 push @jobstep_done, $jobstepid;
873 Log ($jobstepid, "success in $elapsed seconds");
875 $Jobstep->{exitcode} = $childstatus;
876 $Jobstep->{finishtime} = time;
877 process_stderr ($jobstepid, $task_success);
878 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
880 close $reader{$jobstepid};
881 delete $reader{$jobstepid};
882 delete $slot[$proc{$pid}->{slot}]->{pid};
883 push @freeslot, $proc{$pid}->{slot};
887 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
889 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
891 'order' => 'qsequence'
893 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
895 'level' => $arvados_task->{'sequence'},
897 'arvados_task' => $arvados_task
899 push @jobstep, $jobstep;
900 push @jobstep_todo, $#jobstep;
903 $progress_is_dirty = 1;
907 sub check_refresh_wanted
909 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
910 if (@stat && $stat[9] > $latest_refresh) {
911 $latest_refresh = scalar time;
913 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
914 for my $attr ('cancelled_at',
915 'cancelled_by_user_uuid',
916 'cancelled_by_client_uuid') {
917 $Job->{$attr} = $Job2->{$attr};
919 if ($Job->{'cancelled_at'}) {
920 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
921 " by user " . $Job->{cancelled_by_user_uuid});
923 $main::please_freeze = 1;
931 # return if the kill list was checked <4 seconds ago
932 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
936 $squeue_kill_checked = time;
938 # use killem() on procs whose killtime is reached
941 if (exists $proc{$_}->{killtime}
942 && $proc{$_}->{killtime} <= time)
948 # return if the squeue was checked <60 seconds ago
949 if (defined $squeue_checked && $squeue_checked > time - 60)
953 $squeue_checked = time;
957 # here is an opportunity to check for mysterious problems with local procs
961 # get a list of steps still running
962 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
964 if ($squeue[-1] ne "ok")
970 # which of my jobsteps are running, according to squeue?
974 if (/^(\d+)\.(\d+) (\S+)/)
976 if ($1 eq $ENV{SLURM_JOBID})
983 # which of my active child procs (>60s old) were not mentioned by squeue?
986 if ($proc{$_}->{time} < time - 60
987 && !exists $ok{$proc{$_}->{jobstepname}}
988 && !exists $proc{$_}->{killtime})
990 # kill this proc if it hasn't exited in 30 seconds
991 $proc{$_}->{killtime} = time + 30;
997 sub release_allocation
1001 Log (undef, "release job allocation");
1002 system "scancel $ENV{SLURM_JOBID}";
1010 foreach my $job (keys %reader)
1013 while (0 < sysread ($reader{$job}, $buf, 8192))
1015 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1016 $jobstep[$job]->{stderr} .= $buf;
1017 preprocess_stderr ($job);
1018 if (length ($jobstep[$job]->{stderr}) > 16384)
1020 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1029 sub preprocess_stderr
1033 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1035 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1036 Log ($job, "stderr $line");
1037 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1039 $main::please_freeze = 1;
1041 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1042 $jobstep[$job]->{node_fail} = 1;
1043 ban_node_by_slot($jobstep[$job]->{slotindex});
1052 my $task_success = shift;
1053 preprocess_stderr ($job);
1056 Log ($job, "stderr $_");
1057 } split ("\n", $jobstep[$job]->{stderr});
1063 my ($keep, $child_out, $output_block);
1065 my $cmd = "arv keep get \Q$hash\E";
1066 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1067 sysread($keep, $output_block, 64 * 1024 * 1024);
1069 return $output_block;
1074 Log (undef, "collate");
1076 my ($child_out, $child_in);
1077 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1081 next if (!exists $_->{'arvados_task'}->{output} ||
1082 !$_->{'arvados_task'}->{'success'} ||
1083 $_->{'exitcode'} != 0);
1084 my $output = $_->{'arvados_task'}->{output};
1085 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1087 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1088 print $child_in $output;
1090 elsif (@jobstep == 1)
1092 $joboutput = $output;
1095 elsif (defined (my $outblock = fetch_block ($output)))
1097 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1098 print $child_in $outblock;
1102 Log (undef, "XXX fetch_block($output) failed XXX");
1108 if (!defined $joboutput) {
1109 my $s = IO::Select->new($child_out);
1110 if ($s->can_read(120)) {
1111 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1114 Log (undef, "timed out reading from 'arv keep put'");
1121 Log (undef, "output $joboutput");
1122 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1126 Log (undef, "output undef");
1136 my $sig = 2; # SIGINT first
1137 if (exists $proc{$_}->{"sent_$sig"} &&
1138 time - $proc{$_}->{"sent_$sig"} > 4)
1140 $sig = 15; # SIGTERM if SIGINT doesn't work
1142 if (exists $proc{$_}->{"sent_$sig"} &&
1143 time - $proc{$_}->{"sent_$sig"} > 4)
1145 $sig = 9; # SIGKILL if SIGTERM doesn't work
1147 if (!exists $proc{$_}->{"sent_$sig"})
1149 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1151 select (undef, undef, undef, 0.1);
1154 kill $sig, $_; # srun wants two SIGINT to really interrupt
1156 $proc{$_}->{"sent_$sig"} = time;
1157 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1167 vec($bits,fileno($_),1) = 1;
1173 sub Log # ($jobstep_id, $logmessage)
1175 if ($_[1] =~ /\n/) {
1176 for my $line (split (/\n/, $_[1])) {
1181 my $fh = select STDERR; $|=1; select $fh;
1182 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1183 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1186 if ($metastream || -t STDERR) {
1187 my @gmtime = gmtime;
1188 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1189 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1191 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1194 print $metastream $datetime . " " . $message;
1201 my ($package, $file, $line) = caller;
1202 my $message = "@_ at $file line $line\n";
1203 Log (undef, $message);
1204 freeze() if @jobstep_todo;
1205 collate_output() if @jobstep_todo;
1207 save_meta() if $metastream;
1214 return if !$job_has_uuid;
1215 $Job->update_attributes('running' => 0,
1217 'finished_at' => scalar gmtime);
1223 my $justcheckpoint = shift; # false if this will be the last meta saved
1224 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1226 $local_logfile->flush;
1227 my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1228 . quotemeta($local_logfile->filename);
1229 my $loglocator = `$cmd`;
1230 die "system $cmd failed: $?" if $?;
1232 $local_logfile = undef; # the temp file is automatically deleted
1233 Log (undef, "log manifest is $loglocator");
1234 $Job->{'log'} = $loglocator;
1235 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1239 sub freeze_if_want_freeze
1241 if ($main::please_freeze)
1243 release_allocation();
1246 # kill some srun procs before freeze+stop
1247 map { $proc{$_} = {} } @_;
1250 killem (keys %proc);
1251 select (undef, undef, undef, 0.1);
1253 while (($died = waitpid (-1, WNOHANG)) > 0)
1255 delete $proc{$died};
1270 Log (undef, "Freeze not implemented");
1277 croak ("Thaw not implemented");
1293 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1300 my $srunargs = shift;
1301 my $execargs = shift;
1302 my $opts = shift || {};
1304 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1305 print STDERR (join (" ",
1306 map { / / ? "'$_'" : $_ }
1309 if $ENV{CRUNCH_DEBUG};
1311 if (defined $stdin) {
1312 my $child = open STDIN, "-|";
1313 defined $child or die "no fork: $!";
1315 print $stdin or die $!;
1316 close STDOUT or die $!;
1321 return system (@$args) if $opts->{fork};
1324 warn "ENV size is ".length(join(" ",%ENV));
1325 die "exec failed: $!: @$args";
1329 sub ban_node_by_slot {
1330 # Don't start any new jobsteps on this node for 60 seconds
1332 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1333 $slot[$slotid]->{node}->{hold_count}++;
1334 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1339 my ($lockfile, $error_message) = @_;
1340 open L, ">", $lockfile or croak("$lockfile: $!");
1341 if (!flock L, LOCK_EX|LOCK_NB) {
1342 croak("Can't lock $lockfile: $error_message\n");
1349 # checkout-and-build
1353 my $destdir = $ENV{"CRUNCH_SRC"};
1354 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1355 my $repo = $ENV{"CRUNCH_SRC_URL"};
1357 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1359 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1363 unlink "$destdir.commit";
1364 open STDOUT, ">", "$destdir.log";
1365 open STDERR, ">&STDOUT";
1368 my @git_archive_data = <DATA>;
1369 if (@git_archive_data) {
1370 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1371 print TARX @git_archive_data;
1373 die "'tar -C $destdir -xf -' exited $?: $!";
1378 chomp ($pwd = `pwd`);
1379 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1382 for my $src_path ("$destdir/arvados/sdk/python") {
1384 shell_or_die ("virtualenv", $install_dir);
1385 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1389 if (-e "$destdir/crunch_scripts/install") {
1390 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1391 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1393 shell_or_die ("./tests/autotests.sh", $install_dir);
1394 } elsif (-e "./install.sh") {
1395 shell_or_die ("./install.sh", $install_dir);
1399 unlink "$destdir.commit.new";
1400 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1401 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1410 if ($ENV{"DEBUG"}) {
1411 print STDERR "@_\n";
1414 or die "@_ failed: $! exit 0x".sprintf("%x",$?);