2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
85 use File::Path qw( make_path );
87 use constant EX_TEMPFAIL => 75;
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92 if ($ENV{"USER"} ne "crunch" && $< != 0) {
93 # use a tmp dir unique for my uid
94 $ENV{"CRUNCH_TMP"} .= "-$<";
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
114 GetOptions('force-unlock' => \$force_unlock,
115 'git-dir=s' => \$git_dir,
116 'job=s' => \$jobspec,
117 'job-api-token=s' => \$job_api_token,
118 'no-clear-tmp' => \$no_clear_tmp,
119 'resume-stash=s' => \$resume_stash,
122 if (defined $job_api_token) {
123 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # If some other crunch-job process has grabbed this job (or we see
156 # other evidence that the job is already underway) we exit
157 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158 # mark the job as failed.
159 if ($Job->{'is_locked_by_uuid'}) {
160 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
163 if ($Job->{'success'} ne undef) {
164 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
167 if ($Job->{'running'}) {
168 Log(undef, "Job 'running' flag is already set");
171 if ($Job->{'started_at'}) {
172 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
179 $Job = JSON::decode_json($jobspec);
183 map { croak ("No $_ specified") unless $Job->{$_} }
184 qw(script script_version script_parameters);
187 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188 $Job->{'started_at'} = gmtime;
190 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
194 $job_id = $Job->{'uuid'};
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
204 Log (undef, "check slurm allocation");
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
211 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212 push @sinfo, "$localcpus localhost";
214 if (exists $ENV{SLURM_NODELIST})
216 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
220 my ($ncpus, $slurm_nodelist) = split;
221 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
224 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
227 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
230 foreach (split (",", $ranges))
243 push @nodelist, map {
245 $n =~ s/\[[-,\d]+\]/$_/;
252 push @nodelist, $nodelist;
255 foreach my $nodename (@nodelist)
257 Log (undef, "node $nodename - $ncpus slots");
258 my $node = { name => $nodename,
262 foreach my $cpu (1..$ncpus)
264 push @slot, { node => $node,
268 push @node, @nodelist;
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
283 # Claim this job, and make sure nobody else does
284 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
289 $Job->update_attributes('started_at' => scalar gmtime,
292 'tasks_summary' => { 'failed' => 0,
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
331 if (defined $Job->{thawedfromkey})
333 thaw ($Job->{thawedfromkey});
337 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338 'job_uuid' => $Job->{'uuid'},
343 push @jobstep, { 'level' => 0,
345 'arvados_task' => $first_task,
347 push @jobstep_todo, 0;
353 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
365 if (!defined $no_clear_tmp) {
366 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367 system($clear_tmp_cmd) == 0
368 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
370 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
373 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
377 or croak ("setup.py in $src_path failed: exit ".($?>>8));
385 $build_script = <DATA>;
387 Log (undef, "Install revision ".$Job->{script_version});
388 my $nodelist = join(",", @node);
390 if (!defined $no_clear_tmp) {
391 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
393 my $cleanpid = fork();
396 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
402 last if $cleanpid == waitpid (-1, WNOHANG);
403 freeze_if_want_freeze ($cleanpid);
404 select (undef, undef, undef, 0.1);
406 Log (undef, "Clean-work-dir exited $?");
409 # Install requested code version
412 my @srunargs = ("srun",
413 "--nodelist=$nodelist",
414 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
416 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
421 my $treeish = $Job->{'script_version'};
423 # If we're running under crunch-dispatch, it will have pulled the
424 # appropriate source tree into its own repository, and given us that
425 # repo's path as $git_dir. If we're running a "local" job, and a
426 # script_version was specified, it's up to the user to provide the
427 # full path to a local repository in Job->{repository}.
429 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430 # git-archive --remote where appropriate.
432 # TODO: Accept a locally-hosted Arvados repository by name or
433 # UUID. Use arvados.v1.repositories.list or .get to figure out the
434 # appropriate fetch-url.
435 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
437 $ENV{"CRUNCH_SRC_URL"} = $repo;
439 if (-d "$repo/.git") {
440 # We were given a working directory, but we are only interested in
442 $repo = "$repo/.git";
445 # If this looks like a subversion r#, look for it in git-svn commit messages
447 if ($treeish =~ m{^\d{1,4}$}) {
448 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
450 if ($gitlog =~ /^[a-f0-9]{40}$/) {
452 Log (undef, "Using commit $commit for script_version $treeish");
456 # If that didn't work, try asking git to look it up as a tree-ish.
458 if (!defined $commit) {
459 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
461 if ($found =~ /^[0-9a-f]{40}$/s) {
463 if ($commit ne $treeish) {
464 # Make sure we record the real commit id in the database,
465 # frozentokey, logs, etc. -- instead of an abbreviation or a
466 # branch name which can become ambiguous or point to a
467 # different commit in the future.
468 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469 Log (undef, "Using commit $commit for tree-ish $treeish");
470 if ($commit ne $treeish) {
471 $Job->{'script_version'} = $commit;
473 $Job->update_attributes('script_version' => $commit) or
474 croak("Error while updating job");
480 if (defined $commit) {
481 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482 @execargs = ("sh", "-c",
483 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
487 croak ("could not figure out commit id for $treeish");
490 my $installpid = fork();
491 if ($installpid == 0)
493 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
498 last if $installpid == waitpid (-1, WNOHANG);
499 freeze_if_want_freeze ($installpid);
500 select (undef, undef, undef, 0.1);
502 Log (undef, "Install exited $?");
507 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518 croak("No Docker image hash found from locator $docker_locator");
520 $docker_stream =~ s/^\.//;
521 my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 my $docker_pid = fork();
527 if ($docker_pid == 0)
529 srun (["srun", "--nodelist=" . join(',', @node)],
530 ["/bin/sh", "-ec", $docker_install_script]);
535 last if $docker_pid == waitpid (-1, WNOHANG);
536 freeze_if_want_freeze ($docker_pid);
537 select (undef, undef, undef, 0.1);
541 croak("Installing Docker image from $docker_locator returned exit code $?");
545 foreach (qw (script script_version script_parameters runtime_constraints))
549 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
551 foreach (split (/\n/, $Job->{knobs}))
553 Log (undef, "knob " . $_);
558 $main::success = undef;
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569 or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
576 my @freeslot = (0..$#slot);
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
582 update_progress_stats();
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
589 my $id = $jobstep_todo[$todo_ptr];
590 my $Jobstep = $jobstep[$id];
591 if ($Jobstep->{level} != $level)
596 pipe $reader{$id}, "writer" or croak ($!);
597 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
600 my $childslot = $freeslot[0];
601 my $childnode = $slot[$childslot]->{node};
602 my $childslotname = join (".",
603 $slot[$childslot]->{node}->{name},
604 $slot[$childslot]->{cpu});
605 my $childpid = fork();
608 $SIG{'INT'} = 'DEFAULT';
609 $SIG{'QUIT'} = 'DEFAULT';
610 $SIG{'TERM'} = 'DEFAULT';
612 foreach (values (%reader))
616 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617 open(STDOUT,">&writer");
618 open(STDERR,">&writer");
623 delete $ENV{"GNUPGHOME"};
624 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625 $ENV{"TASK_QSEQUENCE"} = $id;
626 $ENV{"TASK_SEQUENCE"} = $level;
627 $ENV{"JOB_SCRIPT"} = $Job->{script};
628 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629 $param =~ tr/a-z/A-Z/;
630 $ENV{"JOB_PARAMETER_$param"} = $value;
632 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635 $ENV{"HOME"} = $ENV{"TASK_WORK"};
636 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
645 "--nodelist=".$childnode->{name},
646 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647 "--job-name=$job_id.$id.$$",
649 my $build_script_to_send = "";
651 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653 ."&& cd $ENV{CRUNCH_TMP} ";
656 $build_script_to_send = $build_script;
660 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
663 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665 # Dynamically configure the container to use the host system as its
666 # DNS server. Get the host's global addresses from the ip command,
667 # and turn them into docker --dns options using gawk.
669 q{$(ip -o address show scope global |
670 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673 $command .= "--env=\QHOME=/home/crunch\E ";
674 while (my ($env_key, $env_val) = each %ENV)
676 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677 if ($env_key eq "TASK_KEEPMOUNT") {
678 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
681 $command .= "--env=\Q$env_key=$env_val\E ";
685 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
686 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
687 $command .= "\Q$docker_hash\E ";
688 $command .= "stdbuf --output=0 --error=0 ";
689 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
692 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693 $command .= "stdbuf --output=0 --error=0 ";
694 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
697 my @execargs = ('bash', '-c', $command);
698 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
699 # exec() failed, we assume nothing happened.
700 Log(undef, "srun() failed on build script");
704 if (!defined $childpid)
711 $proc{$childpid} = { jobstep => $id,
714 jobstepname => "$job_id.$id.$childpid",
716 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
717 $slot[$childslot]->{pid} = $childpid;
719 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
720 Log ($id, "child $childpid started on $childslotname");
721 $Jobstep->{starttime} = time;
722 $Jobstep->{node} = $childnode->{name};
723 $Jobstep->{slotindex} = $childslot;
724 delete $Jobstep->{stderr};
725 delete $Jobstep->{finishtime};
727 splice @jobstep_todo, $todo_ptr, 1;
730 $progress_is_dirty = 1;
734 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
736 last THISROUND if $main::please_freeze;
737 if ($main::please_info)
739 $main::please_info = 0;
743 update_progress_stats();
750 check_refresh_wanted();
752 update_progress_stats();
753 select (undef, undef, undef, 0.1);
755 elsif (time - $progress_stats_updated >= 30)
757 update_progress_stats();
759 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
760 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
762 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
763 .($thisround_failed+$thisround_succeeded)
764 .") -- giving up on this round";
765 Log (undef, $message);
769 # move slots from freeslot to holdslot (or back to freeslot) if necessary
770 for (my $i=$#freeslot; $i>=0; $i--) {
771 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
772 push @holdslot, (splice @freeslot, $i, 1);
775 for (my $i=$#holdslot; $i>=0; $i--) {
776 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
777 push @freeslot, (splice @holdslot, $i, 1);
781 # give up if no nodes are succeeding
782 if (!grep { $_->{node}->{losing_streak} == 0 &&
783 $_->{node}->{hold_count} < 4 } @slot) {
784 my $message = "Every node has failed -- giving up on this round";
785 Log (undef, $message);
792 push @freeslot, splice @holdslot;
793 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
796 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
799 if ($main::please_continue) {
800 $main::please_continue = 0;
803 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
807 check_refresh_wanted();
809 update_progress_stats();
810 select (undef, undef, undef, 0.1);
811 killem (keys %proc) if $main::please_freeze;
815 update_progress_stats();
816 freeze_if_want_freeze();
819 if (!defined $main::success)
822 $thisround_succeeded == 0 &&
823 ($thisround_failed == 0 || $thisround_failed > 4))
825 my $message = "stop because $thisround_failed tasks failed and none succeeded";
826 Log (undef, $message);
835 goto ONELEVEL if !defined $main::success;
838 release_allocation();
840 my $collated_output = &collate_output();
843 $Job->update_attributes('running' => 0,
844 'success' => $collated_output && $main::success,
845 'finished_at' => scalar gmtime)
848 if (!$collated_output) {
849 Log(undef, "output undef");
853 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854 or die "failed to get collated manifest: $!";
855 # Read the original manifest, and strip permission hints from it,
856 # so we can put the result in a Collection.
857 my @stripped_manifest_lines = ();
858 my $orig_manifest_text = '';
859 while (my $manifest_line = <$orig_manifest>) {
860 $orig_manifest_text .= $manifest_line;
861 my @words = split(/ /, $manifest_line, -1);
862 foreach my $ii (0..$#words) {
863 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
864 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
867 push(@stripped_manifest_lines, join(" ", @words));
869 my $stripped_manifest_text = join("", @stripped_manifest_lines);
870 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
871 'uuid' => md5_hex($stripped_manifest_text),
872 'manifest_text' => $orig_manifest_text,
874 Log(undef, "output " . $output->{uuid});
875 $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
876 if ($Job->{'output_is_persistent'}) {
877 $arv->{'links'}->{'create'}->execute('link' => {
878 'tail_kind' => 'arvados#user',
879 'tail_uuid' => $User->{'uuid'},
880 'head_kind' => 'arvados#collection',
881 'head_uuid' => $Job->{'output'},
882 'link_class' => 'resources',
888 Log (undef, "Failed to register output manifest: $@");
892 Log (undef, "finish");
895 exit ($Job->{'success'} ? 1 : 0);
899 sub update_progress_stats
901 $progress_stats_updated = time;
902 return if !$progress_is_dirty;
903 my ($todo, $done, $running) = (scalar @jobstep_todo,
904 scalar @jobstep_done,
905 scalar @slot - scalar @freeslot - scalar @holdslot);
906 $Job->{'tasks_summary'} ||= {};
907 $Job->{'tasks_summary'}->{'todo'} = $todo;
908 $Job->{'tasks_summary'}->{'done'} = $done;
909 $Job->{'tasks_summary'}->{'running'} = $running;
911 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
913 Log (undef, "status: $done done, $running running, $todo todo");
914 $progress_is_dirty = 0;
921 my $pid = waitpid (-1, WNOHANG);
922 return 0 if $pid <= 0;
924 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
926 . $slot[$proc{$pid}->{slot}]->{cpu});
927 my $jobstepid = $proc{$pid}->{jobstep};
928 my $elapsed = time - $proc{$pid}->{time};
929 my $Jobstep = $jobstep[$jobstepid];
931 my $childstatus = $?;
932 my $exitvalue = $childstatus >> 8;
933 my $exitinfo = sprintf("exit %d signal %d%s",
936 ($childstatus & 128 ? ' core dump' : ''));
937 $Jobstep->{'arvados_task'}->reload;
938 my $task_success = $Jobstep->{'arvados_task'}->{success};
940 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
942 if (!defined $task_success) {
943 # task did not indicate one way or the other --> fail
944 $Jobstep->{'arvados_task'}->{success} = 0;
945 $Jobstep->{'arvados_task'}->save;
952 $temporary_fail ||= $Jobstep->{node_fail};
953 $temporary_fail ||= ($exitvalue == 111);
956 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
958 # Check for signs of a failed or misconfigured node
959 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
960 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
961 # Don't count this against jobstep failure thresholds if this
962 # node is already suspected faulty and srun exited quickly
963 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
965 Log ($jobstepid, "blaming failure on suspect node " .
966 $slot[$proc{$pid}->{slot}]->{node}->{name});
967 $temporary_fail ||= 1;
969 ban_node_by_slot($proc{$pid}->{slot});
972 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
973 ++$Jobstep->{'failures'},
974 $temporary_fail ? 'temporary ' : 'permanent',
977 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
978 # Give up on this task, and the whole job
980 $main::please_freeze = 1;
983 # Put this task back on the todo queue
984 push @jobstep_todo, $jobstepid;
986 $Job->{'tasks_summary'}->{'failed'}++;
990 ++$thisround_succeeded;
991 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
992 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
993 push @jobstep_done, $jobstepid;
994 Log ($jobstepid, "success in $elapsed seconds");
996 $Jobstep->{exitcode} = $childstatus;
997 $Jobstep->{finishtime} = time;
998 process_stderr ($jobstepid, $task_success);
999 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1001 close $reader{$jobstepid};
1002 delete $reader{$jobstepid};
1003 delete $slot[$proc{$pid}->{slot}]->{pid};
1004 push @freeslot, $proc{$pid}->{slot};
1007 if ($task_success) {
1009 my $newtask_list = [];
1010 my $newtask_results;
1012 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1014 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1016 'order' => 'qsequence',
1017 'offset' => scalar(@$newtask_list),
1019 push(@$newtask_list, @{$newtask_results->{items}});
1020 } while (@{$newtask_results->{items}});
1021 foreach my $arvados_task (@$newtask_list) {
1023 'level' => $arvados_task->{'sequence'},
1025 'arvados_task' => $arvados_task
1027 push @jobstep, $jobstep;
1028 push @jobstep_todo, $#jobstep;
1032 $progress_is_dirty = 1;
1036 sub check_refresh_wanted
1038 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1039 if (@stat && $stat[9] > $latest_refresh) {
1040 $latest_refresh = scalar time;
1041 if ($job_has_uuid) {
1042 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1043 for my $attr ('cancelled_at',
1044 'cancelled_by_user_uuid',
1045 'cancelled_by_client_uuid') {
1046 $Job->{$attr} = $Job2->{$attr};
1048 if ($Job->{'cancelled_at'}) {
1049 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1050 " by user " . $Job->{cancelled_by_user_uuid});
1052 $main::please_freeze = 1;
1060 # return if the kill list was checked <4 seconds ago
1061 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1065 $squeue_kill_checked = time;
1067 # use killem() on procs whose killtime is reached
1070 if (exists $proc{$_}->{killtime}
1071 && $proc{$_}->{killtime} <= time)
1077 # return if the squeue was checked <60 seconds ago
1078 if (defined $squeue_checked && $squeue_checked > time - 60)
1082 $squeue_checked = time;
1086 # here is an opportunity to check for mysterious problems with local procs
1090 # get a list of steps still running
1091 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1093 if ($squeue[-1] ne "ok")
1099 # which of my jobsteps are running, according to squeue?
1103 if (/^(\d+)\.(\d+) (\S+)/)
1105 if ($1 eq $ENV{SLURM_JOBID})
1112 # which of my active child procs (>60s old) were not mentioned by squeue?
1113 foreach (keys %proc)
1115 if ($proc{$_}->{time} < time - 60
1116 && !exists $ok{$proc{$_}->{jobstepname}}
1117 && !exists $proc{$_}->{killtime})
1119 # kill this proc if it hasn't exited in 30 seconds
1120 $proc{$_}->{killtime} = time + 30;
1126 sub release_allocation
1130 Log (undef, "release job allocation");
1131 system "scancel $ENV{SLURM_JOBID}";
1139 foreach my $job (keys %reader)
1142 while (0 < sysread ($reader{$job}, $buf, 8192))
1144 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1145 $jobstep[$job]->{stderr} .= $buf;
1146 preprocess_stderr ($job);
1147 if (length ($jobstep[$job]->{stderr}) > 16384)
1149 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1158 sub preprocess_stderr
1162 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1164 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1165 Log ($job, "stderr $line");
1166 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1168 $main::please_freeze = 1;
1170 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1171 $jobstep[$job]->{node_fail} = 1;
1172 ban_node_by_slot($jobstep[$job]->{slotindex});
1181 my $task_success = shift;
1182 preprocess_stderr ($job);
1185 Log ($job, "stderr $_");
1186 } split ("\n", $jobstep[$job]->{stderr});
1192 my ($keep, $child_out, $output_block);
1194 my $cmd = "arv-get \Q$hash\E";
1195 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1199 my $bytes = sysread($keep, $buf, 1024 * 1024);
1200 if (!defined $bytes) {
1201 die "reading from arv-get: $!";
1202 } elsif ($bytes == 0) {
1203 # sysread returns 0 at the end of the pipe.
1206 # some bytes were read into buf.
1207 $output_block .= $buf;
1211 return $output_block;
1216 Log (undef, "collate");
1218 my ($child_out, $child_in);
1219 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1223 next if (!exists $_->{'arvados_task'}->{'output'} ||
1224 !$_->{'arvados_task'}->{'success'});
1225 my $output = $_->{'arvados_task'}->{output};
1226 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1228 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1229 print $child_in $output;
1231 elsif (@jobstep == 1)
1233 $joboutput = $output;
1236 elsif (defined (my $outblock = fetch_block ($output)))
1238 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1239 print $child_in $outblock;
1243 Log (undef, "XXX fetch_block($output) failed XXX");
1249 if (!defined $joboutput) {
1250 my $s = IO::Select->new($child_out);
1251 if ($s->can_read(120)) {
1252 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1255 Log (undef, "timed out reading from 'arv-put'");
1268 my $sig = 2; # SIGINT first
1269 if (exists $proc{$_}->{"sent_$sig"} &&
1270 time - $proc{$_}->{"sent_$sig"} > 4)
1272 $sig = 15; # SIGTERM if SIGINT doesn't work
1274 if (exists $proc{$_}->{"sent_$sig"} &&
1275 time - $proc{$_}->{"sent_$sig"} > 4)
1277 $sig = 9; # SIGKILL if SIGTERM doesn't work
1279 if (!exists $proc{$_}->{"sent_$sig"})
1281 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1283 select (undef, undef, undef, 0.1);
1286 kill $sig, $_; # srun wants two SIGINT to really interrupt
1288 $proc{$_}->{"sent_$sig"} = time;
1289 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1299 vec($bits,fileno($_),1) = 1;
1305 sub Log # ($jobstep_id, $logmessage)
1307 if ($_[1] =~ /\n/) {
1308 for my $line (split (/\n/, $_[1])) {
1313 my $fh = select STDERR; $|=1; select $fh;
1314 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1315 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1318 if ($local_logfile || -t STDERR) {
1319 my @gmtime = gmtime;
1320 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1321 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1323 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1325 if ($local_logfile) {
1326 print $local_logfile $datetime . " " . $message;
1333 my ($package, $file, $line) = caller;
1334 my $message = "@_ at $file line $line\n";
1335 Log (undef, $message);
1336 freeze() if @jobstep_todo;
1337 collate_output() if @jobstep_todo;
1339 save_meta() if $local_logfile;
1346 return if !$job_has_uuid;
1347 $Job->update_attributes('running' => 0,
1349 'finished_at' => scalar gmtime);
1355 my $justcheckpoint = shift; # false if this will be the last meta saved
1356 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1358 $local_logfile->flush;
1359 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1360 . quotemeta($local_logfile->filename);
1361 my $loglocator = `$cmd`;
1362 die "system $cmd failed: $?" if $?;
1365 $local_logfile = undef; # the temp file is automatically deleted
1366 Log (undef, "log manifest is $loglocator");
1367 $Job->{'log'} = $loglocator;
1368 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1372 sub freeze_if_want_freeze
1374 if ($main::please_freeze)
1376 release_allocation();
1379 # kill some srun procs before freeze+stop
1380 map { $proc{$_} = {} } @_;
1383 killem (keys %proc);
1384 select (undef, undef, undef, 0.1);
1386 while (($died = waitpid (-1, WNOHANG)) > 0)
1388 delete $proc{$died};
1403 Log (undef, "Freeze not implemented");
1410 croak ("Thaw not implemented");
1426 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1433 my $srunargs = shift;
1434 my $execargs = shift;
1435 my $opts = shift || {};
1437 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1438 print STDERR (join (" ",
1439 map { / / ? "'$_'" : $_ }
1442 if $ENV{CRUNCH_DEBUG};
1444 if (defined $stdin) {
1445 my $child = open STDIN, "-|";
1446 defined $child or die "no fork: $!";
1448 print $stdin or die $!;
1449 close STDOUT or die $!;
1454 return system (@$args) if $opts->{fork};
1457 warn "ENV size is ".length(join(" ",%ENV));
1458 die "exec failed: $!: @$args";
1462 sub ban_node_by_slot {
1463 # Don't start any new jobsteps on this node for 60 seconds
1465 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1466 $slot[$slotid]->{node}->{hold_count}++;
1467 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1472 my ($lockfile, $error_message) = @_;
1473 open L, ">", $lockfile or croak("$lockfile: $!");
1474 if (!flock L, LOCK_EX|LOCK_NB) {
1475 croak("Can't lock $lockfile: $error_message\n");
1479 sub find_docker_image {
1480 # Given a Keep locator, check to see if it contains a Docker image.
1481 # If so, return its stream name and Docker hash.
1482 # If not, return undef for both values.
1483 my $locator = shift;
1484 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1485 my @file_list = @{$image->{files}};
1486 if ((scalar(@file_list) == 1) &&
1487 ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1488 return ($file_list[0][0], $1);
1491 return (undef, undef);
1497 # checkout-and-build
1500 use File::Path qw( make_path );
1502 my $destdir = $ENV{"CRUNCH_SRC"};
1503 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1504 my $repo = $ENV{"CRUNCH_SRC_URL"};
1505 my $task_work = $ENV{"TASK_WORK"};
1508 make_path $task_work;
1509 -e $task_work or die "Failed to create temporary working directory ($task_work): $!";
1512 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1514 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1518 unlink "$destdir.commit";
1519 open STDOUT, ">", "$destdir.log";
1520 open STDERR, ">&STDOUT";
1523 my @git_archive_data = <DATA>;
1524 if (@git_archive_data) {
1525 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1526 print TARX @git_archive_data;
1528 die "'tar -C $destdir -xf -' exited $?: $!";
1533 chomp ($pwd = `pwd`);
1534 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1537 for my $src_path ("$destdir/arvados/sdk/python") {
1539 shell_or_die ("virtualenv", $install_dir);
1540 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1544 if (-e "$destdir/crunch_scripts/install") {
1545 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1546 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1548 shell_or_die ("./tests/autotests.sh", $install_dir);
1549 } elsif (-e "./install.sh") {
1550 shell_or_die ("./install.sh", $install_dir);
1554 unlink "$destdir.commit.new";
1555 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1556 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1565 if ($ENV{"DEBUG"}) {
1566 print STDERR "@_\n";
1569 or die "@_ failed: $! exit 0x".sprintf("%x",$?);