2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
104 GetOptions('force-unlock' => \$force_unlock,
105 'git-dir=s' => \$git_dir,
106 'job=s' => \$jobspec,
107 'job-api-token=s' => \$job_api_token,
108 'no-clear-tmp' => \$no_clear_tmp,
109 'resume-stash=s' => \$resume_stash,
112 if (defined $job_api_token) {
113 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
123 $main::ENV{CRUNCH_DEBUG} = 1;
127 $main::ENV{CRUNCH_DEBUG} = 0;
132 my $arv = Arvados->new('apiVersion' => 'v1');
135 my $User = $arv->{'users'}->{'current'}->execute;
143 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144 if (!$force_unlock) {
145 if ($Job->{'is_locked_by_uuid'}) {
146 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148 if ($Job->{'success'} ne undef) {
149 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151 if ($Job->{'running'}) {
152 croak("Job 'running' flag is already set");
154 if ($Job->{'started_at'}) {
155 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
161 $Job = JSON::decode_json($jobspec);
165 map { croak ("No $_ specified") unless $Job->{$_} }
166 qw(script script_version script_parameters);
169 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170 $Job->{'started_at'} = gmtime;
172 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
176 $job_id = $Job->{'uuid'};
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
186 Log (undef, "check slurm allocation");
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
193 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194 push @sinfo, "$localcpus localhost";
196 if (exists $ENV{SLURM_NODELIST})
198 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
202 my ($ncpus, $slurm_nodelist) = split;
203 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
206 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
209 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
212 foreach (split (",", $ranges))
225 push @nodelist, map {
227 $n =~ s/\[[-,\d]+\]/$_/;
234 push @nodelist, $nodelist;
237 foreach my $nodename (@nodelist)
239 Log (undef, "node $nodename - $ncpus slots");
240 my $node = { name => $nodename,
244 foreach my $cpu (1..$ncpus)
246 push @slot, { node => $node,
250 push @node, @nodelist;
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
265 # Claim this job, and make sure nobody else does
266 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268 croak("Error while updating / locking job");
270 $Job->update_attributes('started_at' => scalar gmtime,
273 'tasks_summary' => { 'failed' => 0,
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
312 if (defined $Job->{thawedfromkey})
314 thaw ($Job->{thawedfromkey});
318 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319 'job_uuid' => $Job->{'uuid'},
324 push @jobstep, { 'level' => 0,
326 'arvados_task' => $first_task,
328 push @jobstep_todo, 0;
334 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
346 if (!defined $no_clear_tmp) {
347 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348 system($clear_tmp_cmd) == 0
349 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
351 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
354 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
358 or croak ("setup.py in $src_path failed: exit ".($?>>8));
366 $build_script = <DATA>;
368 Log (undef, "Install revision ".$Job->{script_version});
369 my $nodelist = join(",", @node);
371 if (!defined $no_clear_tmp) {
372 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
374 my $cleanpid = fork();
377 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
383 last if $cleanpid == waitpid (-1, WNOHANG);
384 freeze_if_want_freeze ($cleanpid);
385 select (undef, undef, undef, 0.1);
387 Log (undef, "Clean-work-dir exited $?");
390 # Install requested code version
393 my @srunargs = ("srun",
394 "--nodelist=$nodelist",
395 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
397 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
402 my $treeish = $Job->{'script_version'};
403 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
404 # Todo: let script_version specify repository instead of expecting
405 # parent process to figure it out.
406 $ENV{"CRUNCH_SRC_URL"} = $repo;
408 # Create/update our clone of the remote git repo
410 if (!-d $ENV{"CRUNCH_SRC"}) {
411 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
412 or croak ("git clone $repo failed: exit ".($?>>8));
413 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
415 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
417 # If this looks like a subversion r#, look for it in git-svn commit messages
419 if ($treeish =~ m{^\d{1,4}$}) {
420 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
422 if ($gitlog =~ /^[a-f0-9]{40}$/) {
424 Log (undef, "Using commit $commit for script_version $treeish");
428 # If that didn't work, try asking git to look it up as a tree-ish.
430 if (!defined $commit) {
432 my $cooked_treeish = $treeish;
433 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
434 # Looks like a git branch name -- make sure git knows it's
435 # relative to the remote repo
436 $cooked_treeish = "origin/$treeish";
439 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
441 if ($found =~ /^[0-9a-f]{40}$/s) {
443 if ($commit ne $treeish) {
444 # Make sure we record the real commit id in the database,
445 # frozentokey, logs, etc. -- instead of an abbreviation or a
446 # branch name which can become ambiguous or point to a
447 # different commit in the future.
448 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
449 Log (undef, "Using commit $commit for tree-ish $treeish");
450 if ($commit ne $treeish) {
451 $Job->{'script_version'} = $commit;
453 $Job->update_attributes('script_version' => $commit) or
454 croak("Error while updating job");
460 if (defined $commit) {
461 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
462 @execargs = ("sh", "-c",
463 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
464 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
467 croak ("could not figure out commit id for $treeish");
470 my $installpid = fork();
471 if ($installpid == 0)
473 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
478 last if $installpid == waitpid (-1, WNOHANG);
479 freeze_if_want_freeze ($installpid);
480 select (undef, undef, undef, 0.1);
482 Log (undef, "Install exited $?");
487 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
488 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
493 foreach (qw (script script_version script_parameters runtime_constraints))
497 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
499 foreach (split (/\n/, $Job->{knobs}))
501 Log (undef, "knob " . $_);
506 $main::success = undef;
512 my $thisround_succeeded = 0;
513 my $thisround_failed = 0;
514 my $thisround_failed_multiple = 0;
516 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
517 or $a <=> $b } @jobstep_todo;
518 my $level = $jobstep[$jobstep_todo[0]]->{level};
519 Log (undef, "start level $level");
524 my @freeslot = (0..$#slot);
527 my $progress_is_dirty = 1;
528 my $progress_stats_updated = 0;
530 update_progress_stats();
535 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
537 my $id = $jobstep_todo[$todo_ptr];
538 my $Jobstep = $jobstep[$id];
539 if ($Jobstep->{level} != $level)
544 pipe $reader{$id}, "writer" or croak ($!);
545 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
546 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
548 my $childslot = $freeslot[0];
549 my $childnode = $slot[$childslot]->{node};
550 my $childslotname = join (".",
551 $slot[$childslot]->{node}->{name},
552 $slot[$childslot]->{cpu});
553 my $childpid = fork();
556 $SIG{'INT'} = 'DEFAULT';
557 $SIG{'QUIT'} = 'DEFAULT';
558 $SIG{'TERM'} = 'DEFAULT';
560 foreach (values (%reader))
564 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
565 open(STDOUT,">&writer");
566 open(STDERR,">&writer");
571 delete $ENV{"GNUPGHOME"};
572 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
573 $ENV{"TASK_QSEQUENCE"} = $id;
574 $ENV{"TASK_SEQUENCE"} = $level;
575 $ENV{"JOB_SCRIPT"} = $Job->{script};
576 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
577 $param =~ tr/a-z/A-Z/;
578 $ENV{"JOB_PARAMETER_$param"} = $value;
580 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
581 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
582 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
583 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
584 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
585 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
586 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
592 "--nodelist=".$childnode->{name},
593 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
594 "--job-name=$job_id.$id.$$",
596 my @execargs = qw(sh);
597 my $build_script_to_send = "";
599 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
600 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
601 ."&& cd $ENV{CRUNCH_TMP} ";
604 $build_script_to_send = $build_script;
609 "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
610 my @execargs = ('bash', '-c', $command);
611 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
615 if (!defined $childpid)
622 $proc{$childpid} = { jobstep => $id,
625 jobstepname => "$job_id.$id.$childpid",
627 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
628 $slot[$childslot]->{pid} = $childpid;
630 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
631 Log ($id, "child $childpid started on $childslotname");
632 $Jobstep->{starttime} = time;
633 $Jobstep->{node} = $childnode->{name};
634 $Jobstep->{slotindex} = $childslot;
635 delete $Jobstep->{stderr};
636 delete $Jobstep->{finishtime};
638 splice @jobstep_todo, $todo_ptr, 1;
641 $progress_is_dirty = 1;
645 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
647 last THISROUND if $main::please_freeze;
648 if ($main::please_info)
650 $main::please_info = 0;
654 update_progress_stats();
661 check_refresh_wanted();
663 update_progress_stats();
664 select (undef, undef, undef, 0.1);
666 elsif (time - $progress_stats_updated >= 30)
668 update_progress_stats();
670 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
671 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
673 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
674 .($thisround_failed+$thisround_succeeded)
675 .") -- giving up on this round";
676 Log (undef, $message);
680 # move slots from freeslot to holdslot (or back to freeslot) if necessary
681 for (my $i=$#freeslot; $i>=0; $i--) {
682 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
683 push @holdslot, (splice @freeslot, $i, 1);
686 for (my $i=$#holdslot; $i>=0; $i--) {
687 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
688 push @freeslot, (splice @holdslot, $i, 1);
692 # give up if no nodes are succeeding
693 if (!grep { $_->{node}->{losing_streak} == 0 &&
694 $_->{node}->{hold_count} < 4 } @slot) {
695 my $message = "Every node has failed -- giving up on this round";
696 Log (undef, $message);
703 push @freeslot, splice @holdslot;
704 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
707 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
710 if ($main::please_continue) {
711 $main::please_continue = 0;
714 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
718 check_refresh_wanted();
720 update_progress_stats();
721 select (undef, undef, undef, 0.1);
722 killem (keys %proc) if $main::please_freeze;
726 update_progress_stats();
727 freeze_if_want_freeze();
730 if (!defined $main::success)
733 $thisround_succeeded == 0 &&
734 ($thisround_failed == 0 || $thisround_failed > 4))
736 my $message = "stop because $thisround_failed tasks failed and none succeeded";
737 Log (undef, $message);
746 goto ONELEVEL if !defined $main::success;
749 release_allocation();
752 $Job->update_attributes('output' => &collate_output(),
754 'success' => $Job->{'output'} && $main::success,
755 'finished_at' => scalar gmtime)
758 if ($Job->{'output'})
761 my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
762 $arv->{'collections'}->{'create'}->execute('collection' => {
763 'uuid' => $Job->{'output'},
764 'manifest_text' => $manifest_text,
766 if ($Job->{'output_is_persistent'}) {
767 $arv->{'links'}->{'create'}->execute('link' => {
768 'tail_kind' => 'arvados#user',
769 'tail_uuid' => $User->{'uuid'},
770 'head_kind' => 'arvados#collection',
771 'head_uuid' => $Job->{'output'},
772 'link_class' => 'resources',
778 Log (undef, "Failed to register output manifest: $@");
782 Log (undef, "finish");
789 sub update_progress_stats
791 $progress_stats_updated = time;
792 return if !$progress_is_dirty;
793 my ($todo, $done, $running) = (scalar @jobstep_todo,
794 scalar @jobstep_done,
795 scalar @slot - scalar @freeslot - scalar @holdslot);
796 $Job->{'tasks_summary'} ||= {};
797 $Job->{'tasks_summary'}->{'todo'} = $todo;
798 $Job->{'tasks_summary'}->{'done'} = $done;
799 $Job->{'tasks_summary'}->{'running'} = $running;
801 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
803 Log (undef, "status: $done done, $running running, $todo todo");
804 $progress_is_dirty = 0;
811 my $pid = waitpid (-1, WNOHANG);
812 return 0 if $pid <= 0;
814 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
816 . $slot[$proc{$pid}->{slot}]->{cpu});
817 my $jobstepid = $proc{$pid}->{jobstep};
818 my $elapsed = time - $proc{$pid}->{time};
819 my $Jobstep = $jobstep[$jobstepid];
821 my $childstatus = $?;
822 my $exitvalue = $childstatus >> 8;
823 my $exitinfo = sprintf("exit %d signal %d%s",
826 ($childstatus & 128 ? ' core dump' : ''));
827 $Jobstep->{'arvados_task'}->reload;
828 my $task_success = $Jobstep->{'arvados_task'}->{success};
830 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
832 if (!defined $task_success) {
833 # task did not indicate one way or the other --> fail
834 $Jobstep->{'arvados_task'}->{success} = 0;
835 $Jobstep->{'arvados_task'}->save;
842 $temporary_fail ||= $Jobstep->{node_fail};
843 $temporary_fail ||= ($exitvalue == 111);
846 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
848 # Check for signs of a failed or misconfigured node
849 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
850 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
851 # Don't count this against jobstep failure thresholds if this
852 # node is already suspected faulty and srun exited quickly
853 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
855 Log ($jobstepid, "blaming failure on suspect node " .
856 $slot[$proc{$pid}->{slot}]->{node}->{name});
857 $temporary_fail ||= 1;
859 ban_node_by_slot($proc{$pid}->{slot});
862 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
863 ++$Jobstep->{'failures'},
864 $temporary_fail ? 'temporary ' : 'permanent',
867 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
868 # Give up on this task, and the whole job
870 $main::please_freeze = 1;
873 # Put this task back on the todo queue
874 push @jobstep_todo, $jobstepid;
876 $Job->{'tasks_summary'}->{'failed'}++;
880 ++$thisround_succeeded;
881 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
882 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
883 push @jobstep_done, $jobstepid;
884 Log ($jobstepid, "success in $elapsed seconds");
886 $Jobstep->{exitcode} = $childstatus;
887 $Jobstep->{finishtime} = time;
888 process_stderr ($jobstepid, $task_success);
889 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
891 close $reader{$jobstepid};
892 delete $reader{$jobstepid};
893 delete $slot[$proc{$pid}->{slot}]->{pid};
894 push @freeslot, $proc{$pid}->{slot};
898 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
900 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
902 'order' => 'qsequence'
904 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
906 'level' => $arvados_task->{'sequence'},
908 'arvados_task' => $arvados_task
910 push @jobstep, $jobstep;
911 push @jobstep_todo, $#jobstep;
914 $progress_is_dirty = 1;
918 sub check_refresh_wanted
920 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
921 if (@stat && $stat[9] > $latest_refresh) {
922 $latest_refresh = scalar time;
924 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
925 for my $attr ('cancelled_at',
926 'cancelled_by_user_uuid',
927 'cancelled_by_client_uuid') {
928 $Job->{$attr} = $Job2->{$attr};
930 if ($Job->{'cancelled_at'}) {
931 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
932 " by user " . $Job->{cancelled_by_user_uuid});
934 $main::please_freeze = 1;
942 # return if the kill list was checked <4 seconds ago
943 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
947 $squeue_kill_checked = time;
949 # use killem() on procs whose killtime is reached
952 if (exists $proc{$_}->{killtime}
953 && $proc{$_}->{killtime} <= time)
959 # return if the squeue was checked <60 seconds ago
960 if (defined $squeue_checked && $squeue_checked > time - 60)
964 $squeue_checked = time;
968 # here is an opportunity to check for mysterious problems with local procs
972 # get a list of steps still running
973 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
975 if ($squeue[-1] ne "ok")
981 # which of my jobsteps are running, according to squeue?
985 if (/^(\d+)\.(\d+) (\S+)/)
987 if ($1 eq $ENV{SLURM_JOBID})
994 # which of my active child procs (>60s old) were not mentioned by squeue?
997 if ($proc{$_}->{time} < time - 60
998 && !exists $ok{$proc{$_}->{jobstepname}}
999 && !exists $proc{$_}->{killtime})
1001 # kill this proc if it hasn't exited in 30 seconds
1002 $proc{$_}->{killtime} = time + 30;
1008 sub release_allocation
1012 Log (undef, "release job allocation");
1013 system "scancel $ENV{SLURM_JOBID}";
1021 foreach my $job (keys %reader)
1024 while (0 < sysread ($reader{$job}, $buf, 8192))
1026 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1027 $jobstep[$job]->{stderr} .= $buf;
1028 preprocess_stderr ($job);
1029 if (length ($jobstep[$job]->{stderr}) > 16384)
1031 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1040 sub preprocess_stderr
1044 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1046 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1047 Log ($job, "stderr $line");
1048 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1050 $main::please_freeze = 1;
1052 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1053 $jobstep[$job]->{node_fail} = 1;
1054 ban_node_by_slot($jobstep[$job]->{slotindex});
1063 my $task_success = shift;
1064 preprocess_stderr ($job);
1067 Log ($job, "stderr $_");
1068 } split ("\n", $jobstep[$job]->{stderr});
1074 my ($keep, $child_out, $output_block);
1076 my $cmd = "arv keep get \Q$hash\E";
1077 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1078 sysread($keep, $output_block, 64 * 1024 * 1024);
1080 return $output_block;
1085 Log (undef, "collate");
1087 my ($child_out, $child_in);
1088 my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1092 next if (!exists $_->{'arvados_task'}->{output} ||
1093 !$_->{'arvados_task'}->{'success'} ||
1094 $_->{'exitcode'} != 0);
1095 my $output = $_->{'arvados_task'}->{output};
1096 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1098 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1099 print $child_in $output;
1101 elsif (@jobstep == 1)
1103 $joboutput = $output;
1106 elsif (defined (my $outblock = fetch_block ($output)))
1108 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1109 print $child_in $outblock;
1113 Log (undef, "XXX fetch_block($output) failed XXX");
1119 if (!defined $joboutput) {
1120 my $s = IO::Select->new($child_out);
1121 if ($s->can_read(120)) {
1122 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1125 Log (undef, "timed out reading from 'arv keep put'");
1132 Log (undef, "output $joboutput");
1133 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1137 Log (undef, "output undef");
1147 my $sig = 2; # SIGINT first
1148 if (exists $proc{$_}->{"sent_$sig"} &&
1149 time - $proc{$_}->{"sent_$sig"} > 4)
1151 $sig = 15; # SIGTERM if SIGINT doesn't work
1153 if (exists $proc{$_}->{"sent_$sig"} &&
1154 time - $proc{$_}->{"sent_$sig"} > 4)
1156 $sig = 9; # SIGKILL if SIGTERM doesn't work
1158 if (!exists $proc{$_}->{"sent_$sig"})
1160 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1162 select (undef, undef, undef, 0.1);
1165 kill $sig, $_; # srun wants two SIGINT to really interrupt
1167 $proc{$_}->{"sent_$sig"} = time;
1168 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1178 vec($bits,fileno($_),1) = 1;
1184 sub Log # ($jobstep_id, $logmessage)
1186 if ($_[1] =~ /\n/) {
1187 for my $line (split (/\n/, $_[1])) {
1192 my $fh = select STDERR; $|=1; select $fh;
1193 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1194 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1197 if ($metastream || -t STDERR) {
1198 my @gmtime = gmtime;
1199 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1200 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1202 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1205 print $metastream $datetime . " " . $message;
1212 my ($package, $file, $line) = caller;
1213 my $message = "@_ at $file line $line\n";
1214 Log (undef, $message);
1215 freeze() if @jobstep_todo;
1216 collate_output() if @jobstep_todo;
1218 save_meta() if $metastream;
1225 return if !$job_has_uuid;
1226 $Job->update_attributes('running' => 0,
1228 'finished_at' => scalar gmtime);
1234 my $justcheckpoint = shift; # false if this will be the last meta saved
1235 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1237 $local_logfile->flush;
1238 my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1239 . quotemeta($local_logfile->filename);
1240 my $loglocator = `$cmd`;
1241 die "system $cmd failed: $?" if $?;
1243 $local_logfile = undef; # the temp file is automatically deleted
1244 Log (undef, "log manifest is $loglocator");
1245 $Job->{'log'} = $loglocator;
1246 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1250 sub freeze_if_want_freeze
1252 if ($main::please_freeze)
1254 release_allocation();
1257 # kill some srun procs before freeze+stop
1258 map { $proc{$_} = {} } @_;
1261 killem (keys %proc);
1262 select (undef, undef, undef, 0.1);
1264 while (($died = waitpid (-1, WNOHANG)) > 0)
1266 delete $proc{$died};
1281 Log (undef, "Freeze not implemented");
1288 croak ("Thaw not implemented");
1304 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1311 my $srunargs = shift;
1312 my $execargs = shift;
1313 my $opts = shift || {};
1315 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1316 print STDERR (join (" ",
1317 map { / / ? "'$_'" : $_ }
1320 if $ENV{CRUNCH_DEBUG};
1322 if (defined $stdin) {
1323 my $child = open STDIN, "-|";
1324 defined $child or die "no fork: $!";
1326 print $stdin or die $!;
1327 close STDOUT or die $!;
1332 return system (@$args) if $opts->{fork};
1335 warn "ENV size is ".length(join(" ",%ENV));
1336 die "exec failed: $!: @$args";
1340 sub ban_node_by_slot {
1341 # Don't start any new jobsteps on this node for 60 seconds
1343 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1344 $slot[$slotid]->{node}->{hold_count}++;
1345 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1350 my ($lockfile, $error_message) = @_;
1351 open L, ">", $lockfile or croak("$lockfile: $!");
1352 if (!flock L, LOCK_EX|LOCK_NB) {
1353 croak("Can't lock $lockfile: $error_message\n");
1360 # checkout-and-build
1364 my $destdir = $ENV{"CRUNCH_SRC"};
1365 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1366 my $repo = $ENV{"CRUNCH_SRC_URL"};
1368 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1370 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1374 unlink "$destdir.commit";
1375 open STDOUT, ">", "$destdir.log";
1376 open STDERR, ">&STDOUT";
1379 my @git_archive_data = <DATA>;
1380 if (@git_archive_data) {
1381 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1382 print TARX @git_archive_data;
1384 die "'tar -C $destdir -xf -' exited $?: $!";
1389 chomp ($pwd = `pwd`);
1390 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1393 for my $src_path ("$destdir/arvados/sdk/python") {
1395 shell_or_die ("virtualenv", $install_dir);
1396 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1400 if (-e "$destdir/crunch_scripts/install") {
1401 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1402 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1404 shell_or_die ("./tests/autotests.sh", $install_dir);
1405 } elsif (-e "./install.sh") {
1406 shell_or_die ("./install.sh", $install_dir);
1410 unlink "$destdir.commit.new";
1411 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1412 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1421 if ($ENV{"DEBUG"}) {
1422 print STDERR "@_\n";
1425 or die "@_ failed: $! exit 0x".sprintf("%x",$?);