2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
100 if (defined $ENV{"ARV_CLI"}) {
101 $arv_cli = $ENV{"ARV_CLI"};
113 GetOptions('force-unlock' => \$force_unlock,
114 'git-dir=s' => \$git_dir,
115 'job=s' => \$jobspec,
116 'job-api-token=s' => \$job_api_token,
117 'no-clear-tmp' => \$no_clear_tmp,
118 'resume-stash=s' => \$resume_stash,
121 if (defined $job_api_token) {
122 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
132 $main::ENV{CRUNCH_DEBUG} = 1;
136 $main::ENV{CRUNCH_DEBUG} = 0;
141 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $User = $arv->{'users'}->{'current'}->execute;
152 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153 if (!$force_unlock) {
154 if ($Job->{'is_locked_by_uuid'}) {
155 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157 if ($Job->{'success'} ne undef) {
158 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160 if ($Job->{'running'}) {
161 croak("Job 'running' flag is already set");
163 if ($Job->{'started_at'}) {
164 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
170 $Job = JSON::decode_json($jobspec);
174 map { croak ("No $_ specified") unless $Job->{$_} }
175 qw(script script_version script_parameters);
178 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179 $Job->{'started_at'} = gmtime;
181 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
185 $job_id = $Job->{'uuid'};
187 my $keep_logfile = $job_id . '.log.txt';
188 my $local_logfile = File::Temp->new();
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
195 Log (undef, "check slurm allocation");
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
202 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203 push @sinfo, "$localcpus localhost";
205 if (exists $ENV{SLURM_NODELIST})
207 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
211 my ($ncpus, $slurm_nodelist) = split;
212 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
215 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
218 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
221 foreach (split (",", $ranges))
234 push @nodelist, map {
236 $n =~ s/\[[-,\d]+\]/$_/;
243 push @nodelist, $nodelist;
246 foreach my $nodename (@nodelist)
248 Log (undef, "node $nodename - $ncpus slots");
249 my $node = { name => $nodename,
253 foreach my $cpu (1..$ncpus)
255 push @slot, { node => $node,
259 push @node, @nodelist;
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274 # Claim this job, and make sure nobody else does
275 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277 croak("Error while updating / locking job");
279 $Job->update_attributes('started_at' => scalar gmtime,
282 'tasks_summary' => { 'failed' => 0,
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
321 if (defined $Job->{thawedfromkey})
323 thaw ($Job->{thawedfromkey});
327 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328 'job_uuid' => $Job->{'uuid'},
333 push @jobstep, { 'level' => 0,
335 'arvados_task' => $first_task,
337 push @jobstep_todo, 0;
343 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
355 if (!defined $no_clear_tmp) {
356 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357 system($clear_tmp_cmd) == 0
358 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367 or croak ("setup.py in $src_path failed: exit ".($?>>8));
375 $build_script = <DATA>;
377 Log (undef, "Install revision ".$Job->{script_version});
378 my $nodelist = join(",", @node);
380 if (!defined $no_clear_tmp) {
381 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383 my $cleanpid = fork();
386 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
392 last if $cleanpid == waitpid (-1, WNOHANG);
393 freeze_if_want_freeze ($cleanpid);
394 select (undef, undef, undef, 0.1);
396 Log (undef, "Clean-work-dir exited $?");
399 # Install requested code version
402 my @srunargs = ("srun",
403 "--nodelist=$nodelist",
404 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
411 my $treeish = $Job->{'script_version'};
413 # If we're running under crunch-dispatch, it will have pulled the
414 # appropriate source tree into its own repository, and given us that
415 # repo's path as $git_dir. If we're running a "local" job, and a
416 # script_version was specified, it's up to the user to provide the
417 # full path to a local repository in Job->{repository}.
419 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420 # git-archive --remote where appropriate.
422 # TODO: Accept a locally-hosted Arvados repository by name or
423 # UUID. Use arvados.v1.repositories.list or .get to figure out the
424 # appropriate fetch-url.
425 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427 $ENV{"CRUNCH_SRC_URL"} = $repo;
429 if (-d "$repo/.git") {
430 # We were given a working directory, but we are only interested in
432 $repo = "$repo/.git";
435 # If this looks like a subversion r#, look for it in git-svn commit messages
437 if ($treeish =~ m{^\d{1,4}$}) {
438 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440 if ($gitlog =~ /^[a-f0-9]{40}$/) {
442 Log (undef, "Using commit $commit for script_version $treeish");
446 # If that didn't work, try asking git to look it up as a tree-ish.
448 if (!defined $commit) {
449 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451 if ($found =~ /^[0-9a-f]{40}$/s) {
453 if ($commit ne $treeish) {
454 # Make sure we record the real commit id in the database,
455 # frozentokey, logs, etc. -- instead of an abbreviation or a
456 # branch name which can become ambiguous or point to a
457 # different commit in the future.
458 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459 Log (undef, "Using commit $commit for tree-ish $treeish");
460 if ($commit ne $treeish) {
461 $Job->{'script_version'} = $commit;
463 $Job->update_attributes('script_version' => $commit) or
464 croak("Error while updating job");
470 if (defined $commit) {
471 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472 @execargs = ("sh", "-c",
473 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
477 croak ("could not figure out commit id for $treeish");
480 my $installpid = fork();
481 if ($installpid == 0)
483 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
488 last if $installpid == waitpid (-1, WNOHANG);
489 freeze_if_want_freeze ($installpid);
490 select (undef, undef, undef, 0.1);
492 Log (undef, "Install exited $?");
497 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
501 # If this job requires a Docker image, install that.
502 my $docker_bin = "/usr/bin/docker.io";
503 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 foreach (qw (script script_version script_parameters runtime_constraints))
509 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
511 foreach (split (/\n/, $Job->{knobs}))
513 Log (undef, "knob " . $_);
518 $main::success = undef;
524 my $thisround_succeeded = 0;
525 my $thisround_failed = 0;
526 my $thisround_failed_multiple = 0;
528 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
529 or $a <=> $b } @jobstep_todo;
530 my $level = $jobstep[$jobstep_todo[0]]->{level};
531 Log (undef, "start level $level");
536 my @freeslot = (0..$#slot);
539 my $progress_is_dirty = 1;
540 my $progress_stats_updated = 0;
542 update_progress_stats();
547 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
549 my $id = $jobstep_todo[$todo_ptr];
550 my $Jobstep = $jobstep[$id];
551 if ($Jobstep->{level} != $level)
556 pipe $reader{$id}, "writer" or croak ($!);
557 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
558 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
560 my $childslot = $freeslot[0];
561 my $childnode = $slot[$childslot]->{node};
562 my $childslotname = join (".",
563 $slot[$childslot]->{node}->{name},
564 $slot[$childslot]->{cpu});
565 my $childpid = fork();
568 $SIG{'INT'} = 'DEFAULT';
569 $SIG{'QUIT'} = 'DEFAULT';
570 $SIG{'TERM'} = 'DEFAULT';
572 foreach (values (%reader))
576 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
577 open(STDOUT,">&writer");
578 open(STDERR,">&writer");
583 delete $ENV{"GNUPGHOME"};
584 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
585 $ENV{"TASK_QSEQUENCE"} = $id;
586 $ENV{"TASK_SEQUENCE"} = $level;
587 $ENV{"JOB_SCRIPT"} = $Job->{script};
588 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
589 $param =~ tr/a-z/A-Z/;
590 $ENV{"JOB_PARAMETER_$param"} = $value;
592 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
593 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
594 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
595 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
596 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
597 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
598 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
604 "--nodelist=".$childnode->{name},
605 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
606 "--job-name=$job_id.$id.$$",
608 my $build_script_to_send = "";
610 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
611 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
612 ."&& cd $ENV{CRUNCH_TMP} ";
615 $build_script_to_send = $build_script;
619 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
622 $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
623 # Dynamically configure the container to use the host system as its
624 # DNS server. Get the host's global addresses from the ip command,
625 # and turn them into docker --dns options using gawk.
627 q{$(ip -o address show scope global |
628 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
629 foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
631 $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
633 while (my ($env_key, $env_val) = each %ENV)
635 $command .= "-e \Q$env_key=$env_val\E ";
637 $command .= "$docker_image ";
639 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
640 my @execargs = ('bash', '-c', $command);
641 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
645 if (!defined $childpid)
652 $proc{$childpid} = { jobstep => $id,
655 jobstepname => "$job_id.$id.$childpid",
657 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
658 $slot[$childslot]->{pid} = $childpid;
660 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
661 Log ($id, "child $childpid started on $childslotname");
662 $Jobstep->{starttime} = time;
663 $Jobstep->{node} = $childnode->{name};
664 $Jobstep->{slotindex} = $childslot;
665 delete $Jobstep->{stderr};
666 delete $Jobstep->{finishtime};
668 splice @jobstep_todo, $todo_ptr, 1;
671 $progress_is_dirty = 1;
675 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
677 last THISROUND if $main::please_freeze;
678 if ($main::please_info)
680 $main::please_info = 0;
684 update_progress_stats();
691 check_refresh_wanted();
693 update_progress_stats();
694 select (undef, undef, undef, 0.1);
696 elsif (time - $progress_stats_updated >= 30)
698 update_progress_stats();
700 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
701 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
703 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
704 .($thisround_failed+$thisround_succeeded)
705 .") -- giving up on this round";
706 Log (undef, $message);
710 # move slots from freeslot to holdslot (or back to freeslot) if necessary
711 for (my $i=$#freeslot; $i>=0; $i--) {
712 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
713 push @holdslot, (splice @freeslot, $i, 1);
716 for (my $i=$#holdslot; $i>=0; $i--) {
717 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
718 push @freeslot, (splice @holdslot, $i, 1);
722 # give up if no nodes are succeeding
723 if (!grep { $_->{node}->{losing_streak} == 0 &&
724 $_->{node}->{hold_count} < 4 } @slot) {
725 my $message = "Every node has failed -- giving up on this round";
726 Log (undef, $message);
733 push @freeslot, splice @holdslot;
734 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
737 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
740 if ($main::please_continue) {
741 $main::please_continue = 0;
744 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
748 check_refresh_wanted();
750 update_progress_stats();
751 select (undef, undef, undef, 0.1);
752 killem (keys %proc) if $main::please_freeze;
756 update_progress_stats();
757 freeze_if_want_freeze();
760 if (!defined $main::success)
763 $thisround_succeeded == 0 &&
764 ($thisround_failed == 0 || $thisround_failed > 4))
766 my $message = "stop because $thisround_failed tasks failed and none succeeded";
767 Log (undef, $message);
776 goto ONELEVEL if !defined $main::success;
779 release_allocation();
782 $Job->update_attributes('output' => &collate_output(),
784 'success' => $Job->{'output'} && $main::success,
785 'finished_at' => scalar gmtime)
788 if ($Job->{'output'})
791 my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
792 $arv->{'collections'}->{'create'}->execute('collection' => {
793 'uuid' => $Job->{'output'},
794 'manifest_text' => $manifest_text,
796 if ($Job->{'output_is_persistent'}) {
797 $arv->{'links'}->{'create'}->execute('link' => {
798 'tail_kind' => 'arvados#user',
799 'tail_uuid' => $User->{'uuid'},
800 'head_kind' => 'arvados#collection',
801 'head_uuid' => $Job->{'output'},
802 'link_class' => 'resources',
808 Log (undef, "Failed to register output manifest: $@");
812 Log (undef, "finish");
819 sub update_progress_stats
821 $progress_stats_updated = time;
822 return if !$progress_is_dirty;
823 my ($todo, $done, $running) = (scalar @jobstep_todo,
824 scalar @jobstep_done,
825 scalar @slot - scalar @freeslot - scalar @holdslot);
826 $Job->{'tasks_summary'} ||= {};
827 $Job->{'tasks_summary'}->{'todo'} = $todo;
828 $Job->{'tasks_summary'}->{'done'} = $done;
829 $Job->{'tasks_summary'}->{'running'} = $running;
831 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
833 Log (undef, "status: $done done, $running running, $todo todo");
834 $progress_is_dirty = 0;
841 my $pid = waitpid (-1, WNOHANG);
842 return 0 if $pid <= 0;
844 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
846 . $slot[$proc{$pid}->{slot}]->{cpu});
847 my $jobstepid = $proc{$pid}->{jobstep};
848 my $elapsed = time - $proc{$pid}->{time};
849 my $Jobstep = $jobstep[$jobstepid];
851 my $childstatus = $?;
852 my $exitvalue = $childstatus >> 8;
853 my $exitinfo = sprintf("exit %d signal %d%s",
856 ($childstatus & 128 ? ' core dump' : ''));
857 $Jobstep->{'arvados_task'}->reload;
858 my $task_success = $Jobstep->{'arvados_task'}->{success};
860 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
862 if (!defined $task_success) {
863 # task did not indicate one way or the other --> fail
864 $Jobstep->{'arvados_task'}->{success} = 0;
865 $Jobstep->{'arvados_task'}->save;
872 $temporary_fail ||= $Jobstep->{node_fail};
873 $temporary_fail ||= ($exitvalue == 111);
876 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
878 # Check for signs of a failed or misconfigured node
879 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
880 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
881 # Don't count this against jobstep failure thresholds if this
882 # node is already suspected faulty and srun exited quickly
883 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
885 Log ($jobstepid, "blaming failure on suspect node " .
886 $slot[$proc{$pid}->{slot}]->{node}->{name});
887 $temporary_fail ||= 1;
889 ban_node_by_slot($proc{$pid}->{slot});
892 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
893 ++$Jobstep->{'failures'},
894 $temporary_fail ? 'temporary ' : 'permanent',
897 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
898 # Give up on this task, and the whole job
900 $main::please_freeze = 1;
903 # Put this task back on the todo queue
904 push @jobstep_todo, $jobstepid;
906 $Job->{'tasks_summary'}->{'failed'}++;
910 ++$thisround_succeeded;
911 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
912 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
913 push @jobstep_done, $jobstepid;
914 Log ($jobstepid, "success in $elapsed seconds");
916 $Jobstep->{exitcode} = $childstatus;
917 $Jobstep->{finishtime} = time;
918 process_stderr ($jobstepid, $task_success);
919 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
921 close $reader{$jobstepid};
922 delete $reader{$jobstepid};
923 delete $slot[$proc{$pid}->{slot}]->{pid};
924 push @freeslot, $proc{$pid}->{slot};
928 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
930 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
932 'order' => 'qsequence'
934 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
936 'level' => $arvados_task->{'sequence'},
938 'arvados_task' => $arvados_task
940 push @jobstep, $jobstep;
941 push @jobstep_todo, $#jobstep;
944 $progress_is_dirty = 1;
948 sub check_refresh_wanted
950 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
951 if (@stat && $stat[9] > $latest_refresh) {
952 $latest_refresh = scalar time;
954 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
955 for my $attr ('cancelled_at',
956 'cancelled_by_user_uuid',
957 'cancelled_by_client_uuid') {
958 $Job->{$attr} = $Job2->{$attr};
960 if ($Job->{'cancelled_at'}) {
961 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
962 " by user " . $Job->{cancelled_by_user_uuid});
964 $main::please_freeze = 1;
972 # return if the kill list was checked <4 seconds ago
973 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
977 $squeue_kill_checked = time;
979 # use killem() on procs whose killtime is reached
982 if (exists $proc{$_}->{killtime}
983 && $proc{$_}->{killtime} <= time)
989 # return if the squeue was checked <60 seconds ago
990 if (defined $squeue_checked && $squeue_checked > time - 60)
994 $squeue_checked = time;
998 # here is an opportunity to check for mysterious problems with local procs
1002 # get a list of steps still running
1003 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1005 if ($squeue[-1] ne "ok")
1011 # which of my jobsteps are running, according to squeue?
1015 if (/^(\d+)\.(\d+) (\S+)/)
1017 if ($1 eq $ENV{SLURM_JOBID})
1024 # which of my active child procs (>60s old) were not mentioned by squeue?
1025 foreach (keys %proc)
1027 if ($proc{$_}->{time} < time - 60
1028 && !exists $ok{$proc{$_}->{jobstepname}}
1029 && !exists $proc{$_}->{killtime})
1031 # kill this proc if it hasn't exited in 30 seconds
1032 $proc{$_}->{killtime} = time + 30;
1038 sub release_allocation
1042 Log (undef, "release job allocation");
1043 system "scancel $ENV{SLURM_JOBID}";
1051 foreach my $job (keys %reader)
1054 while (0 < sysread ($reader{$job}, $buf, 8192))
1056 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1057 $jobstep[$job]->{stderr} .= $buf;
1058 preprocess_stderr ($job);
1059 if (length ($jobstep[$job]->{stderr}) > 16384)
1061 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1070 sub preprocess_stderr
1074 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1076 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1077 Log ($job, "stderr $line");
1078 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1080 $main::please_freeze = 1;
1082 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1083 $jobstep[$job]->{node_fail} = 1;
1084 ban_node_by_slot($jobstep[$job]->{slotindex});
1093 my $task_success = shift;
1094 preprocess_stderr ($job);
1097 Log ($job, "stderr $_");
1098 } split ("\n", $jobstep[$job]->{stderr});
1104 my ($keep, $child_out, $output_block);
1106 my $cmd = "$arv_cli keep get \Q$hash\E";
1107 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1108 sysread($keep, $output_block, 64 * 1024 * 1024);
1110 return $output_block;
1115 Log (undef, "collate");
1117 my ($child_out, $child_in);
1118 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1122 next if (!exists $_->{'arvados_task'}->{output} ||
1123 !$_->{'arvados_task'}->{'success'} ||
1124 $_->{'exitcode'} != 0);
1125 my $output = $_->{'arvados_task'}->{output};
1126 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1128 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1129 print $child_in $output;
1131 elsif (@jobstep == 1)
1133 $joboutput = $output;
1136 elsif (defined (my $outblock = fetch_block ($output)))
1138 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1139 print $child_in $outblock;
1143 Log (undef, "XXX fetch_block($output) failed XXX");
1149 if (!defined $joboutput) {
1150 my $s = IO::Select->new($child_out);
1151 if ($s->can_read(120)) {
1152 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1155 Log (undef, "timed out reading from 'arv keep put'");
1162 Log (undef, "output $joboutput");
1163 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1167 Log (undef, "output undef");
1177 my $sig = 2; # SIGINT first
1178 if (exists $proc{$_}->{"sent_$sig"} &&
1179 time - $proc{$_}->{"sent_$sig"} > 4)
1181 $sig = 15; # SIGTERM if SIGINT doesn't work
1183 if (exists $proc{$_}->{"sent_$sig"} &&
1184 time - $proc{$_}->{"sent_$sig"} > 4)
1186 $sig = 9; # SIGKILL if SIGTERM doesn't work
1188 if (!exists $proc{$_}->{"sent_$sig"})
1190 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1192 select (undef, undef, undef, 0.1);
1195 kill $sig, $_; # srun wants two SIGINT to really interrupt
1197 $proc{$_}->{"sent_$sig"} = time;
1198 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1208 vec($bits,fileno($_),1) = 1;
1214 sub Log # ($jobstep_id, $logmessage)
1216 if ($_[1] =~ /\n/) {
1217 for my $line (split (/\n/, $_[1])) {
1222 my $fh = select STDERR; $|=1; select $fh;
1223 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1224 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1227 if ($metastream || -t STDERR) {
1228 my @gmtime = gmtime;
1229 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1230 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1232 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1235 print $metastream $datetime . " " . $message;
1242 my ($package, $file, $line) = caller;
1243 my $message = "@_ at $file line $line\n";
1244 Log (undef, $message);
1245 freeze() if @jobstep_todo;
1246 collate_output() if @jobstep_todo;
1248 save_meta() if $metastream;
1255 return if !$job_has_uuid;
1256 $Job->update_attributes('running' => 0,
1258 'finished_at' => scalar gmtime);
1264 my $justcheckpoint = shift; # false if this will be the last meta saved
1265 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1267 $local_logfile->flush;
1268 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1269 . quotemeta($local_logfile->filename);
1270 my $loglocator = `$cmd`;
1271 die "system $cmd failed: $?" if $?;
1273 $local_logfile = undef; # the temp file is automatically deleted
1274 Log (undef, "log manifest is $loglocator");
1275 $Job->{'log'} = $loglocator;
1276 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1280 sub freeze_if_want_freeze
1282 if ($main::please_freeze)
1284 release_allocation();
1287 # kill some srun procs before freeze+stop
1288 map { $proc{$_} = {} } @_;
1291 killem (keys %proc);
1292 select (undef, undef, undef, 0.1);
1294 while (($died = waitpid (-1, WNOHANG)) > 0)
1296 delete $proc{$died};
1311 Log (undef, "Freeze not implemented");
1318 croak ("Thaw not implemented");
1334 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1341 my $srunargs = shift;
1342 my $execargs = shift;
1343 my $opts = shift || {};
1345 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1346 print STDERR (join (" ",
1347 map { / / ? "'$_'" : $_ }
1350 if $ENV{CRUNCH_DEBUG};
1352 if (defined $stdin) {
1353 my $child = open STDIN, "-|";
1354 defined $child or die "no fork: $!";
1356 print $stdin or die $!;
1357 close STDOUT or die $!;
1362 return system (@$args) if $opts->{fork};
1365 warn "ENV size is ".length(join(" ",%ENV));
1366 die "exec failed: $!: @$args";
1370 sub ban_node_by_slot {
1371 # Don't start any new jobsteps on this node for 60 seconds
1373 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1374 $slot[$slotid]->{node}->{hold_count}++;
1375 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1380 my ($lockfile, $error_message) = @_;
1381 open L, ">", $lockfile or croak("$lockfile: $!");
1382 if (!flock L, LOCK_EX|LOCK_NB) {
1383 croak("Can't lock $lockfile: $error_message\n");
1390 # checkout-and-build
1394 my $destdir = $ENV{"CRUNCH_SRC"};
1395 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1396 my $repo = $ENV{"CRUNCH_SRC_URL"};
1398 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1400 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1404 unlink "$destdir.commit";
1405 open STDOUT, ">", "$destdir.log";
1406 open STDERR, ">&STDOUT";
1409 my @git_archive_data = <DATA>;
1410 if (@git_archive_data) {
1411 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1412 print TARX @git_archive_data;
1414 die "'tar -C $destdir -xf -' exited $?: $!";
1419 chomp ($pwd = `pwd`);
1420 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1423 for my $src_path ("$destdir/arvados/sdk/python") {
1425 shell_or_die ("virtualenv", $install_dir);
1426 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1430 if (-e "$destdir/crunch_scripts/install") {
1431 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1432 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1434 shell_or_die ("./tests/autotests.sh", $install_dir);
1435 } elsif (-e "./install.sh") {
1436 shell_or_die ("./install.sh", $install_dir);
1440 unlink "$destdir.commit.new";
1441 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1442 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1451 if ($ENV{"DEBUG"}) {
1452 print STDERR "@_\n";
1455 or die "@_ failed: $! exit 0x".sprintf("%x",$?);