2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89 if ($ENV{"USER"} ne "crunch" && $< != 0) {
90 # use a tmp dir unique for my uid
91 $ENV{"CRUNCH_TMP"} .= "-$<";
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
105 GetOptions('force-unlock' => \$force_unlock,
106 'git-dir=s' => \$git_dir,
107 'job=s' => \$jobspec,
108 'job-api-token=s' => \$job_api_token,
109 'no-clear-tmp' => \$no_clear_tmp,
110 'resume-stash=s' => \$resume_stash,
113 if (defined $job_api_token) {
114 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
117 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
118 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
119 my $local_job = !$job_has_uuid;
124 $main::ENV{CRUNCH_DEBUG} = 1;
128 $main::ENV{CRUNCH_DEBUG} = 0;
133 my $arv = Arvados->new('apiVersion' => 'v1');
136 my $User = $arv->{'users'}->{'current'}->execute;
144 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
145 if (!$force_unlock) {
146 if ($Job->{'is_locked_by_uuid'}) {
147 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
149 if ($Job->{'success'} ne undef) {
150 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
152 if ($Job->{'running'}) {
153 croak("Job 'running' flag is already set");
155 if ($Job->{'started_at'}) {
156 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
162 $Job = JSON::decode_json($jobspec);
166 map { croak ("No $_ specified") unless $Job->{$_} }
167 qw(script script_version script_parameters);
170 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
171 $Job->{'started_at'} = gmtime;
173 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
177 $job_id = $Job->{'uuid'};
179 my $keep_logfile = $job_id . '.log.txt';
180 $local_logfile = File::Temp->new();
182 $Job->{'runtime_constraints'} ||= {};
183 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
184 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
187 Log (undef, "check slurm allocation");
190 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
194 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
195 push @sinfo, "$localcpus localhost";
197 if (exists $ENV{SLURM_NODELIST})
199 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
203 my ($ncpus, $slurm_nodelist) = split;
204 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
207 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
210 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
213 foreach (split (",", $ranges))
226 push @nodelist, map {
228 $n =~ s/\[[-,\d]+\]/$_/;
235 push @nodelist, $nodelist;
238 foreach my $nodename (@nodelist)
240 Log (undef, "node $nodename - $ncpus slots");
241 my $node = { name => $nodename,
245 foreach my $cpu (1..$ncpus)
247 push @slot, { node => $node,
251 push @node, @nodelist;
256 # Ensure that we get one jobstep running on each allocated node before
257 # we start overloading nodes with concurrent steps
259 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
266 # Claim this job, and make sure nobody else does
267 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
268 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269 croak("Error while updating / locking job");
271 $Job->update_attributes('started_at' => scalar gmtime,
274 'tasks_summary' => { 'failed' => 0,
281 Log (undef, "start");
282 $SIG{'INT'} = sub { $main::please_freeze = 1; };
283 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
284 $SIG{'TERM'} = \&croak;
285 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
286 $SIG{'ALRM'} = sub { $main::please_info = 1; };
287 $SIG{'CONT'} = sub { $main::please_continue = 1; };
288 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290 $main::please_freeze = 0;
291 $main::please_info = 0;
292 $main::please_continue = 0;
293 $main::please_refresh = 0;
294 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
296 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
297 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
298 $ENV{"JOB_UUID"} = $job_id;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
313 if (defined $Job->{thawedfromkey})
315 thaw ($Job->{thawedfromkey});
319 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
320 'job_uuid' => $Job->{'uuid'},
325 push @jobstep, { 'level' => 0,
327 'arvados_task' => $first_task,
329 push @jobstep_todo, 0;
335 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
344 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
347 if (!defined $no_clear_tmp) {
348 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
349 system($clear_tmp_cmd) == 0
350 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
352 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
353 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
355 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
356 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
357 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
359 or croak ("setup.py in $src_path failed: exit ".($?>>8));
367 $build_script = <DATA>;
369 Log (undef, "Install revision ".$Job->{script_version});
370 my $nodelist = join(",", @node);
372 if (!defined $no_clear_tmp) {
373 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
375 my $cleanpid = fork();
378 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
379 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
384 last if $cleanpid == waitpid (-1, WNOHANG);
385 freeze_if_want_freeze ($cleanpid);
386 select (undef, undef, undef, 0.1);
388 Log (undef, "Clean-work-dir exited $?");
391 # Install requested code version
394 my @srunargs = ("srun",
395 "--nodelist=$nodelist",
396 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
398 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
403 my $treeish = $Job->{'script_version'};
405 # If we're running under crunch-dispatch, it will have pulled the
406 # appropriate source tree into its own repository, and given us that
407 # repo's path as $git_dir. If we're running a "local" job, and a
408 # script_version was specified, it's up to the user to provide the
409 # full path to a local repository in Job->{repository}.
411 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
412 # git-archive --remote where appropriate.
414 # TODO: Accept a locally-hosted Arvados repository by name or
415 # UUID. Use arvados.v1.repositories.list or .get to figure out the
416 # appropriate fetch-url.
417 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
419 $ENV{"CRUNCH_SRC_URL"} = $repo;
421 if (-d "$repo/.git") {
422 # We were given a working directory, but we are only interested in
424 $repo = "$repo/.git";
427 # If this looks like a subversion r#, look for it in git-svn commit messages
429 if ($treeish =~ m{^\d{1,4}$}) {
430 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
432 if ($gitlog =~ /^[a-f0-9]{40}$/) {
434 Log (undef, "Using commit $commit for script_version $treeish");
438 # If that didn't work, try asking git to look it up as a tree-ish.
440 if (!defined $commit) {
441 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
443 if ($found =~ /^[0-9a-f]{40}$/s) {
445 if ($commit ne $treeish) {
446 # Make sure we record the real commit id in the database,
447 # frozentokey, logs, etc. -- instead of an abbreviation or a
448 # branch name which can become ambiguous or point to a
449 # different commit in the future.
450 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451 Log (undef, "Using commit $commit for tree-ish $treeish");
452 if ($commit ne $treeish) {
453 $Job->{'script_version'} = $commit;
455 $Job->update_attributes('script_version' => $commit) or
456 croak("Error while updating job");
462 if (defined $commit) {
463 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
464 @execargs = ("sh", "-c",
465 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
466 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
469 croak ("could not figure out commit id for $treeish");
472 my $installpid = fork();
473 if ($installpid == 0)
475 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
480 last if $installpid == waitpid (-1, WNOHANG);
481 freeze_if_want_freeze ($installpid);
482 select (undef, undef, undef, 0.1);
484 Log (undef, "Install exited $?");
489 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
490 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
493 # If this job requires a Docker image, install that.
494 my $docker_bin = "/usr/bin/docker.io";
495 my ($docker_locator, $docker_hash);
496 if ($docker_locator = $Job->{docker_image_locator}) {
497 $docker_hash = find_docker_hash($docker_locator);
500 croak("No Docker image hash found from locator $docker_locator");
502 my $docker_install_script = qq{
503 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
504 arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
507 my $docker_pid = fork();
508 if ($docker_pid == 0)
510 srun (["srun", "--nodelist=" . join(',', @node)],
511 ["/bin/sh", "-ec", $docker_install_script]);
516 last if $docker_pid == waitpid (-1, WNOHANG);
517 freeze_if_want_freeze ($docker_pid);
518 select (undef, undef, undef, 0.1);
522 croak("Installing Docker image from $docker_locator returned exit code $?");
526 foreach (qw (script script_version script_parameters runtime_constraints))
530 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 foreach (split (/\n/, $Job->{knobs}))
534 Log (undef, "knob " . $_);
539 $main::success = undef;
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550 or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
557 my @freeslot = (0..$#slot);
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
563 update_progress_stats();
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 my $id = $jobstep_todo[$todo_ptr];
571 my $Jobstep = $jobstep[$id];
572 if ($Jobstep->{level} != $level)
577 pipe $reader{$id}, "writer" or croak ($!);
578 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581 my $childslot = $freeslot[0];
582 my $childnode = $slot[$childslot]->{node};
583 my $childslotname = join (".",
584 $slot[$childslot]->{node}->{name},
585 $slot[$childslot]->{cpu});
586 my $childpid = fork();
589 $SIG{'INT'} = 'DEFAULT';
590 $SIG{'QUIT'} = 'DEFAULT';
591 $SIG{'TERM'} = 'DEFAULT';
593 foreach (values (%reader))
597 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598 open(STDOUT,">&writer");
599 open(STDERR,">&writer");
604 delete $ENV{"GNUPGHOME"};
605 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606 $ENV{"TASK_QSEQUENCE"} = $id;
607 $ENV{"TASK_SEQUENCE"} = $level;
608 $ENV{"JOB_SCRIPT"} = $Job->{script};
609 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610 $param =~ tr/a-z/A-Z/;
611 $ENV{"JOB_PARAMETER_$param"} = $value;
613 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616 $ENV{"HOME"} = $ENV{"TASK_WORK"};
617 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
626 "--nodelist=".$childnode->{name},
627 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628 "--job-name=$job_id.$id.$$",
630 my $build_script_to_send = "";
632 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
634 ."&& cd $ENV{CRUNCH_TMP} ";
637 $build_script_to_send = $build_script;
641 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
644 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
646 # Dynamically configure the container to use the host system as its
647 # DNS server. Get the host's global addresses from the ip command,
648 # and turn them into docker --dns options using gawk.
650 q{$(ip -o address show scope global |
651 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
653 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
654 $command .= "--env=\QHOME=/home/crunch\E ";
655 while (my ($env_key, $env_val) = each %ENV)
657 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
658 if ($env_key eq "TASK_WORK") {
659 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
661 elsif ($env_key eq "TASK_KEEPMOUNT") {
662 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
664 elsif ($env_key eq "CRUNCH_SRC") {
665 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
668 $command .= "--env=\Q$env_key=$env_val\E ";
672 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
673 $command .= "\Q$docker_hash\E ";
674 $command .= "stdbuf --output=0 --error=0 ";
675 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
678 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
679 $command .= "stdbuf --output=0 --error=0 ";
680 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
683 my @execargs = ('bash', '-c', $command);
684 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
688 if (!defined $childpid)
695 $proc{$childpid} = { jobstep => $id,
698 jobstepname => "$job_id.$id.$childpid",
700 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
701 $slot[$childslot]->{pid} = $childpid;
703 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
704 Log ($id, "child $childpid started on $childslotname");
705 $Jobstep->{starttime} = time;
706 $Jobstep->{node} = $childnode->{name};
707 $Jobstep->{slotindex} = $childslot;
708 delete $Jobstep->{stderr};
709 delete $Jobstep->{finishtime};
711 splice @jobstep_todo, $todo_ptr, 1;
714 $progress_is_dirty = 1;
718 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
720 last THISROUND if $main::please_freeze;
721 if ($main::please_info)
723 $main::please_info = 0;
727 update_progress_stats();
734 check_refresh_wanted();
736 update_progress_stats();
737 select (undef, undef, undef, 0.1);
739 elsif (time - $progress_stats_updated >= 30)
741 update_progress_stats();
743 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
746 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747 .($thisround_failed+$thisround_succeeded)
748 .") -- giving up on this round";
749 Log (undef, $message);
753 # move slots from freeslot to holdslot (or back to freeslot) if necessary
754 for (my $i=$#freeslot; $i>=0; $i--) {
755 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756 push @holdslot, (splice @freeslot, $i, 1);
759 for (my $i=$#holdslot; $i>=0; $i--) {
760 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761 push @freeslot, (splice @holdslot, $i, 1);
765 # give up if no nodes are succeeding
766 if (!grep { $_->{node}->{losing_streak} == 0 &&
767 $_->{node}->{hold_count} < 4 } @slot) {
768 my $message = "Every node has failed -- giving up on this round";
769 Log (undef, $message);
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
783 if ($main::please_continue) {
784 $main::please_continue = 0;
787 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
791 check_refresh_wanted();
793 update_progress_stats();
794 select (undef, undef, undef, 0.1);
795 killem (keys %proc) if $main::please_freeze;
799 update_progress_stats();
800 freeze_if_want_freeze();
803 if (!defined $main::success)
806 $thisround_succeeded == 0 &&
807 ($thisround_failed == 0 || $thisround_failed > 4))
809 my $message = "stop because $thisround_failed tasks failed and none succeeded";
810 Log (undef, $message);
819 goto ONELEVEL if !defined $main::success;
822 release_allocation();
824 my $collated_output = &collate_output();
827 $Job->update_attributes('running' => 0,
828 'success' => $collated_output && $main::success,
829 'finished_at' => scalar gmtime)
832 if ($collated_output)
835 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
836 or die "failed to get collated manifest: $!";
837 # Read the original manifest, and strip permission hints from it,
838 # so we can put the result in a Collection.
839 my @stripped_manifest_lines = ();
840 my $orig_manifest_text = '';
841 while (my $manifest_line = <$orig_manifest>) {
842 $orig_manifest_text .= $manifest_line;
843 my @words = split(/ /, $manifest_line, -1);
844 foreach my $ii (0..$#words) {
845 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
846 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
849 push(@stripped_manifest_lines, join(" ", @words));
851 my $stripped_manifest_text = join("", @stripped_manifest_lines);
852 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
853 'uuid' => md5_hex($stripped_manifest_text),
854 'manifest_text' => $orig_manifest_text,
856 $Job->update_attributes('output' => $output->{uuid});
857 if ($Job->{'output_is_persistent'}) {
858 $arv->{'links'}->{'create'}->execute('link' => {
859 'tail_kind' => 'arvados#user',
860 'tail_uuid' => $User->{'uuid'},
861 'head_kind' => 'arvados#collection',
862 'head_uuid' => $Job->{'output'},
863 'link_class' => 'resources',
869 Log (undef, "Failed to register output manifest: $@");
873 Log (undef, "finish");
880 sub update_progress_stats
882 $progress_stats_updated = time;
883 return if !$progress_is_dirty;
884 my ($todo, $done, $running) = (scalar @jobstep_todo,
885 scalar @jobstep_done,
886 scalar @slot - scalar @freeslot - scalar @holdslot);
887 $Job->{'tasks_summary'} ||= {};
888 $Job->{'tasks_summary'}->{'todo'} = $todo;
889 $Job->{'tasks_summary'}->{'done'} = $done;
890 $Job->{'tasks_summary'}->{'running'} = $running;
892 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
894 Log (undef, "status: $done done, $running running, $todo todo");
895 $progress_is_dirty = 0;
902 my $pid = waitpid (-1, WNOHANG);
903 return 0 if $pid <= 0;
905 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
907 . $slot[$proc{$pid}->{slot}]->{cpu});
908 my $jobstepid = $proc{$pid}->{jobstep};
909 my $elapsed = time - $proc{$pid}->{time};
910 my $Jobstep = $jobstep[$jobstepid];
912 my $childstatus = $?;
913 my $exitvalue = $childstatus >> 8;
914 my $exitinfo = sprintf("exit %d signal %d%s",
917 ($childstatus & 128 ? ' core dump' : ''));
918 $Jobstep->{'arvados_task'}->reload;
919 my $task_success = $Jobstep->{'arvados_task'}->{success};
921 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
923 if (!defined $task_success) {
924 # task did not indicate one way or the other --> fail
925 $Jobstep->{'arvados_task'}->{success} = 0;
926 $Jobstep->{'arvados_task'}->save;
933 $temporary_fail ||= $Jobstep->{node_fail};
934 $temporary_fail ||= ($exitvalue == 111);
937 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
939 # Check for signs of a failed or misconfigured node
940 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
941 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
942 # Don't count this against jobstep failure thresholds if this
943 # node is already suspected faulty and srun exited quickly
944 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
946 Log ($jobstepid, "blaming failure on suspect node " .
947 $slot[$proc{$pid}->{slot}]->{node}->{name});
948 $temporary_fail ||= 1;
950 ban_node_by_slot($proc{$pid}->{slot});
953 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
954 ++$Jobstep->{'failures'},
955 $temporary_fail ? 'temporary ' : 'permanent',
958 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
959 # Give up on this task, and the whole job
961 $main::please_freeze = 1;
964 # Put this task back on the todo queue
965 push @jobstep_todo, $jobstepid;
967 $Job->{'tasks_summary'}->{'failed'}++;
971 ++$thisround_succeeded;
972 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
973 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
974 push @jobstep_done, $jobstepid;
975 Log ($jobstepid, "success in $elapsed seconds");
977 $Jobstep->{exitcode} = $childstatus;
978 $Jobstep->{finishtime} = time;
979 process_stderr ($jobstepid, $task_success);
980 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
982 close $reader{$jobstepid};
983 delete $reader{$jobstepid};
984 delete $slot[$proc{$pid}->{slot}]->{pid};
985 push @freeslot, $proc{$pid}->{slot};
990 my $newtask_list = [];
993 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
995 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
997 'order' => 'qsequence',
998 'offset' => scalar(@$newtask_list),
1000 push(@$newtask_list, @{$newtask_results->{items}});
1001 } while (@{$newtask_results->{items}});
1002 foreach my $arvados_task (@$newtask_list) {
1004 'level' => $arvados_task->{'sequence'},
1006 'arvados_task' => $arvados_task
1008 push @jobstep, $jobstep;
1009 push @jobstep_todo, $#jobstep;
1013 $progress_is_dirty = 1;
1017 sub check_refresh_wanted
1019 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1020 if (@stat && $stat[9] > $latest_refresh) {
1021 $latest_refresh = scalar time;
1022 if ($job_has_uuid) {
1023 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1024 for my $attr ('cancelled_at',
1025 'cancelled_by_user_uuid',
1026 'cancelled_by_client_uuid') {
1027 $Job->{$attr} = $Job2->{$attr};
1029 if ($Job->{'cancelled_at'}) {
1030 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1031 " by user " . $Job->{cancelled_by_user_uuid});
1033 $main::please_freeze = 1;
1041 # return if the kill list was checked <4 seconds ago
1042 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1046 $squeue_kill_checked = time;
1048 # use killem() on procs whose killtime is reached
1051 if (exists $proc{$_}->{killtime}
1052 && $proc{$_}->{killtime} <= time)
1058 # return if the squeue was checked <60 seconds ago
1059 if (defined $squeue_checked && $squeue_checked > time - 60)
1063 $squeue_checked = time;
1067 # here is an opportunity to check for mysterious problems with local procs
1071 # get a list of steps still running
1072 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1074 if ($squeue[-1] ne "ok")
1080 # which of my jobsteps are running, according to squeue?
1084 if (/^(\d+)\.(\d+) (\S+)/)
1086 if ($1 eq $ENV{SLURM_JOBID})
1093 # which of my active child procs (>60s old) were not mentioned by squeue?
1094 foreach (keys %proc)
1096 if ($proc{$_}->{time} < time - 60
1097 && !exists $ok{$proc{$_}->{jobstepname}}
1098 && !exists $proc{$_}->{killtime})
1100 # kill this proc if it hasn't exited in 30 seconds
1101 $proc{$_}->{killtime} = time + 30;
1107 sub release_allocation
1111 Log (undef, "release job allocation");
1112 system "scancel $ENV{SLURM_JOBID}";
1120 foreach my $job (keys %reader)
1123 while (0 < sysread ($reader{$job}, $buf, 8192))
1125 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1126 $jobstep[$job]->{stderr} .= $buf;
1127 preprocess_stderr ($job);
1128 if (length ($jobstep[$job]->{stderr}) > 16384)
1130 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1139 sub preprocess_stderr
1143 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1145 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1146 Log ($job, "stderr $line");
1147 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1149 $main::please_freeze = 1;
1151 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1152 $jobstep[$job]->{node_fail} = 1;
1153 ban_node_by_slot($jobstep[$job]->{slotindex});
1162 my $task_success = shift;
1163 preprocess_stderr ($job);
1166 Log ($job, "stderr $_");
1167 } split ("\n", $jobstep[$job]->{stderr});
1173 my ($keep, $child_out, $output_block);
1175 my $cmd = "arv-get \Q$hash\E";
1176 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1177 sysread($keep, $output_block, 64 * 1024 * 1024);
1179 return $output_block;
1184 Log (undef, "collate");
1186 my ($child_out, $child_in);
1187 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1191 next if (!exists $_->{'arvados_task'}->{output} ||
1192 !$_->{'arvados_task'}->{'success'} ||
1193 $_->{'exitcode'} != 0);
1194 my $output = $_->{'arvados_task'}->{output};
1195 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1197 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1198 print $child_in $output;
1200 elsif (@jobstep == 1)
1202 $joboutput = $output;
1205 elsif (defined (my $outblock = fetch_block ($output)))
1207 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1208 print $child_in $outblock;
1212 Log (undef, "XXX fetch_block($output) failed XXX");
1218 if (!defined $joboutput) {
1219 my $s = IO::Select->new($child_out);
1220 if ($s->can_read(120)) {
1221 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1224 Log (undef, "timed out reading from 'arv-put'");
1231 Log (undef, "output $joboutput");
1232 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1236 Log (undef, "output undef");
1246 my $sig = 2; # SIGINT first
1247 if (exists $proc{$_}->{"sent_$sig"} &&
1248 time - $proc{$_}->{"sent_$sig"} > 4)
1250 $sig = 15; # SIGTERM if SIGINT doesn't work
1252 if (exists $proc{$_}->{"sent_$sig"} &&
1253 time - $proc{$_}->{"sent_$sig"} > 4)
1255 $sig = 9; # SIGKILL if SIGTERM doesn't work
1257 if (!exists $proc{$_}->{"sent_$sig"})
1259 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1261 select (undef, undef, undef, 0.1);
1264 kill $sig, $_; # srun wants two SIGINT to really interrupt
1266 $proc{$_}->{"sent_$sig"} = time;
1267 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1277 vec($bits,fileno($_),1) = 1;
1283 sub Log # ($jobstep_id, $logmessage)
1285 if ($_[1] =~ /\n/) {
1286 for my $line (split (/\n/, $_[1])) {
1291 my $fh = select STDERR; $|=1; select $fh;
1292 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1293 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1296 if ($local_logfile || -t STDERR) {
1297 my @gmtime = gmtime;
1298 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1299 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1301 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1303 if ($local_logfile) {
1304 print $local_logfile $datetime . " " . $message;
1311 my ($package, $file, $line) = caller;
1312 my $message = "@_ at $file line $line\n";
1313 Log (undef, $message);
1314 freeze() if @jobstep_todo;
1315 collate_output() if @jobstep_todo;
1317 save_meta() if $local_logfile;
1324 return if !$job_has_uuid;
1325 $Job->update_attributes('running' => 0,
1327 'finished_at' => scalar gmtime);
1333 my $justcheckpoint = shift; # false if this will be the last meta saved
1334 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1336 $local_logfile->flush;
1337 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1338 . quotemeta($local_logfile->filename);
1339 my $loglocator = `$cmd`;
1340 die "system $cmd failed: $?" if $?;
1343 $local_logfile = undef; # the temp file is automatically deleted
1344 Log (undef, "log manifest is $loglocator");
1345 $Job->{'log'} = $loglocator;
1346 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1350 sub freeze_if_want_freeze
1352 if ($main::please_freeze)
1354 release_allocation();
1357 # kill some srun procs before freeze+stop
1358 map { $proc{$_} = {} } @_;
1361 killem (keys %proc);
1362 select (undef, undef, undef, 0.1);
1364 while (($died = waitpid (-1, WNOHANG)) > 0)
1366 delete $proc{$died};
1381 Log (undef, "Freeze not implemented");
1388 croak ("Thaw not implemented");
1404 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1411 my $srunargs = shift;
1412 my $execargs = shift;
1413 my $opts = shift || {};
1415 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1416 print STDERR (join (" ",
1417 map { / / ? "'$_'" : $_ }
1420 if $ENV{CRUNCH_DEBUG};
1422 if (defined $stdin) {
1423 my $child = open STDIN, "-|";
1424 defined $child or die "no fork: $!";
1426 print $stdin or die $!;
1427 close STDOUT or die $!;
1432 return system (@$args) if $opts->{fork};
1435 warn "ENV size is ".length(join(" ",%ENV));
1436 die "exec failed: $!: @$args";
1440 sub ban_node_by_slot {
1441 # Don't start any new jobsteps on this node for 60 seconds
1443 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1444 $slot[$slotid]->{node}->{hold_count}++;
1445 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1450 my ($lockfile, $error_message) = @_;
1451 open L, ">", $lockfile or croak("$lockfile: $!");
1452 if (!flock L, LOCK_EX|LOCK_NB) {
1453 croak("Can't lock $lockfile: $error_message\n");
1457 sub find_docker_hash {
1458 # Given a Keep locator, search for a matching link to find the Docker hash
1459 # of the stored image.
1460 my $locator = shift;
1461 my $links_result = $arv->{links}->{list}->execute(
1462 filters => [["head_uuid", "=", $locator],
1463 ["link_class", "=", "docker_image_hash"]],
1466 foreach my $link (@{$links_result->{items}}) {
1467 $docker_hash = lc($link->{name});
1469 return $docker_hash;
1475 # checkout-and-build
1479 my $destdir = $ENV{"CRUNCH_SRC"};
1480 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1481 my $repo = $ENV{"CRUNCH_SRC_URL"};
1483 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1485 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1489 unlink "$destdir.commit";
1490 open STDOUT, ">", "$destdir.log";
1491 open STDERR, ">&STDOUT";
1494 my @git_archive_data = <DATA>;
1495 if (@git_archive_data) {
1496 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1497 print TARX @git_archive_data;
1499 die "'tar -C $destdir -xf -' exited $?: $!";
1504 chomp ($pwd = `pwd`);
1505 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1508 for my $src_path ("$destdir/arvados/sdk/python") {
1510 shell_or_die ("virtualenv", $install_dir);
1511 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1515 if (-e "$destdir/crunch_scripts/install") {
1516 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1517 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1519 shell_or_die ("./tests/autotests.sh", $install_dir);
1520 } elsif (-e "./install.sh") {
1521 shell_or_die ("./install.sh", $install_dir);
1525 unlink "$destdir.commit.new";
1526 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1527 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1536 if ($ENV{"DEBUG"}) {
1537 print STDERR "@_\n";
1540 or die "@_ failed: $! exit 0x".sprintf("%x",$?);