2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89 if ($ENV{"USER"} ne "crunch" && $< != 0) {
90 # use a tmp dir unique for my uid
91 $ENV{"CRUNCH_TMP"} .= "-$<";
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
105 GetOptions('force-unlock' => \$force_unlock,
106 'git-dir=s' => \$git_dir,
107 'job=s' => \$jobspec,
108 'job-api-token=s' => \$job_api_token,
109 'no-clear-tmp' => \$no_clear_tmp,
110 'resume-stash=s' => \$resume_stash,
113 if (defined $job_api_token) {
114 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
117 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
118 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
119 my $local_job = !$job_has_uuid;
124 $main::ENV{CRUNCH_DEBUG} = 1;
128 $main::ENV{CRUNCH_DEBUG} = 0;
133 my $arv = Arvados->new('apiVersion' => 'v1');
136 my $User = $arv->{'users'}->{'current'}->execute;
144 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
145 if (!$force_unlock) {
146 if ($Job->{'is_locked_by_uuid'}) {
147 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
149 if ($Job->{'success'} ne undef) {
150 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
152 if ($Job->{'running'}) {
153 croak("Job 'running' flag is already set");
155 if ($Job->{'started_at'}) {
156 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
162 $Job = JSON::decode_json($jobspec);
166 map { croak ("No $_ specified") unless $Job->{$_} }
167 qw(script script_version script_parameters);
170 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
171 $Job->{'started_at'} = gmtime;
173 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
177 $job_id = $Job->{'uuid'};
179 my $keep_logfile = $job_id . '.log.txt';
180 $local_logfile = File::Temp->new();
182 $Job->{'runtime_constraints'} ||= {};
183 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
184 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
187 Log (undef, "check slurm allocation");
190 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
194 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
195 push @sinfo, "$localcpus localhost";
197 if (exists $ENV{SLURM_NODELIST})
199 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
203 my ($ncpus, $slurm_nodelist) = split;
204 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
207 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
210 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
213 foreach (split (",", $ranges))
226 push @nodelist, map {
228 $n =~ s/\[[-,\d]+\]/$_/;
235 push @nodelist, $nodelist;
238 foreach my $nodename (@nodelist)
240 Log (undef, "node $nodename - $ncpus slots");
241 my $node = { name => $nodename,
245 foreach my $cpu (1..$ncpus)
247 push @slot, { node => $node,
251 push @node, @nodelist;
256 # Ensure that we get one jobstep running on each allocated node before
257 # we start overloading nodes with concurrent steps
259 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
266 # Claim this job, and make sure nobody else does
267 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
268 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269 croak("Error while updating / locking job");
271 $Job->update_attributes('started_at' => scalar gmtime,
274 'tasks_summary' => { 'failed' => 0,
281 Log (undef, "start");
282 $SIG{'INT'} = sub { $main::please_freeze = 1; };
283 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
284 $SIG{'TERM'} = \&croak;
285 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
286 $SIG{'ALRM'} = sub { $main::please_info = 1; };
287 $SIG{'CONT'} = sub { $main::please_continue = 1; };
288 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290 $main::please_freeze = 0;
291 $main::please_info = 0;
292 $main::please_continue = 0;
293 $main::please_refresh = 0;
294 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
296 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
297 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
298 $ENV{"JOB_UUID"} = $job_id;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
313 if (defined $Job->{thawedfromkey})
315 thaw ($Job->{thawedfromkey});
319 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
320 'job_uuid' => $Job->{'uuid'},
325 push @jobstep, { 'level' => 0,
327 'arvados_task' => $first_task,
329 push @jobstep_todo, 0;
335 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
344 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
347 if (!defined $no_clear_tmp) {
348 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
349 system($clear_tmp_cmd) == 0
350 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
352 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
353 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
355 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
356 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
357 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
359 or croak ("setup.py in $src_path failed: exit ".($?>>8));
367 $build_script = <DATA>;
369 Log (undef, "Install revision ".$Job->{script_version});
370 my $nodelist = join(",", @node);
372 if (!defined $no_clear_tmp) {
373 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
375 my $cleanpid = fork();
378 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
379 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
384 last if $cleanpid == waitpid (-1, WNOHANG);
385 freeze_if_want_freeze ($cleanpid);
386 select (undef, undef, undef, 0.1);
388 Log (undef, "Clean-work-dir exited $?");
391 # Install requested code version
394 my @srunargs = ("srun",
395 "--nodelist=$nodelist",
396 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
398 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
403 my $treeish = $Job->{'script_version'};
405 # If we're running under crunch-dispatch, it will have pulled the
406 # appropriate source tree into its own repository, and given us that
407 # repo's path as $git_dir. If we're running a "local" job, and a
408 # script_version was specified, it's up to the user to provide the
409 # full path to a local repository in Job->{repository}.
411 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
412 # git-archive --remote where appropriate.
414 # TODO: Accept a locally-hosted Arvados repository by name or
415 # UUID. Use arvados.v1.repositories.list or .get to figure out the
416 # appropriate fetch-url.
417 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
419 $ENV{"CRUNCH_SRC_URL"} = $repo;
421 if (-d "$repo/.git") {
422 # We were given a working directory, but we are only interested in
424 $repo = "$repo/.git";
427 # If this looks like a subversion r#, look for it in git-svn commit messages
429 if ($treeish =~ m{^\d{1,4}$}) {
430 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
432 if ($gitlog =~ /^[a-f0-9]{40}$/) {
434 Log (undef, "Using commit $commit for script_version $treeish");
438 # If that didn't work, try asking git to look it up as a tree-ish.
440 if (!defined $commit) {
441 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
443 if ($found =~ /^[0-9a-f]{40}$/s) {
445 if ($commit ne $treeish) {
446 # Make sure we record the real commit id in the database,
447 # frozentokey, logs, etc. -- instead of an abbreviation or a
448 # branch name which can become ambiguous or point to a
449 # different commit in the future.
450 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451 Log (undef, "Using commit $commit for tree-ish $treeish");
452 if ($commit ne $treeish) {
453 $Job->{'script_version'} = $commit;
455 $Job->update_attributes('script_version' => $commit) or
456 croak("Error while updating job");
462 if (defined $commit) {
463 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
464 @execargs = ("sh", "-c",
465 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
466 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
469 croak ("could not figure out commit id for $treeish");
472 my $installpid = fork();
473 if ($installpid == 0)
475 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
480 last if $installpid == waitpid (-1, WNOHANG);
481 freeze_if_want_freeze ($installpid);
482 select (undef, undef, undef, 0.1);
484 Log (undef, "Install exited $?");
489 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
490 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
493 # If this job requires a Docker image, install that.
494 my $docker_bin = "/usr/bin/docker.io";
495 my ($docker_locator, $docker_hash);
496 if ($docker_locator = $Job->{docker_image_locator}) {
497 $docker_hash = find_docker_hash($docker_locator);
500 croak("No Docker image hash found from locator $docker_locator");
502 my $docker_install_script = qq{
503 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
504 arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
507 my $docker_pid = fork();
508 if ($docker_pid == 0)
510 srun (["srun", "--nodelist=" . join(',', @node)],
511 ["/bin/sh", "-ec", $docker_install_script]);
516 last if $docker_pid == waitpid (-1, WNOHANG);
517 freeze_if_want_freeze ($docker_pid);
518 select (undef, undef, undef, 0.1);
522 croak("Installing Docker image from $docker_locator returned exit code $?");
526 foreach (qw (script script_version script_parameters runtime_constraints))
530 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 foreach (split (/\n/, $Job->{knobs}))
534 Log (undef, "knob " . $_);
539 $main::success = undef;
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550 or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
557 my @freeslot = (0..$#slot);
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
563 update_progress_stats();
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 my $id = $jobstep_todo[$todo_ptr];
571 my $Jobstep = $jobstep[$id];
572 if ($Jobstep->{level} != $level)
577 pipe $reader{$id}, "writer" or croak ($!);
578 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581 my $childslot = $freeslot[0];
582 my $childnode = $slot[$childslot]->{node};
583 my $childslotname = join (".",
584 $slot[$childslot]->{node}->{name},
585 $slot[$childslot]->{cpu});
586 my $childpid = fork();
589 $SIG{'INT'} = 'DEFAULT';
590 $SIG{'QUIT'} = 'DEFAULT';
591 $SIG{'TERM'} = 'DEFAULT';
593 foreach (values (%reader))
597 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598 open(STDOUT,">&writer");
599 open(STDERR,">&writer");
604 delete $ENV{"GNUPGHOME"};
605 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606 $ENV{"TASK_QSEQUENCE"} = $id;
607 $ENV{"TASK_SEQUENCE"} = $level;
608 $ENV{"JOB_SCRIPT"} = $Job->{script};
609 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610 $param =~ tr/a-z/A-Z/;
611 $ENV{"JOB_PARAMETER_$param"} = $value;
613 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616 $ENV{"HOME"} = $ENV{"TASK_WORK"};
617 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
626 "--nodelist=".$childnode->{name},
627 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628 "--job-name=$job_id.$id.$$",
630 my $build_script_to_send = "";
632 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
634 ."&& cd $ENV{CRUNCH_TMP} ";
637 $build_script_to_send = $build_script;
641 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
644 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
646 # Dynamically configure the container to use the host system as its
647 # DNS server. Get the host's global addresses from the ip command,
648 # and turn them into docker --dns options using gawk.
650 q{$(ip -o address show scope global |
651 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
653 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
654 $command .= "--env=\QHOME=/home/crunch\E ";
655 while (my ($env_key, $env_val) = each %ENV)
657 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
658 if ($env_key eq "TASK_WORK") {
659 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
661 elsif ($env_key eq "TASK_KEEPMOUNT") {
662 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
664 elsif ($env_key eq "CRUNCH_SRC") {
665 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
668 $command .= "--env=\Q$env_key=$env_val\E ";
672 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
673 $command .= "\Q$docker_hash\E ";
674 $command .= "stdbuf --output=0 --error=0 ";
675 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
678 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
679 $command .= "stdbuf --output=0 --error=0 ";
680 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
683 my @execargs = ('bash', '-c', $command);
684 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
688 if (!defined $childpid)
695 $proc{$childpid} = { jobstep => $id,
698 jobstepname => "$job_id.$id.$childpid",
700 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
701 $slot[$childslot]->{pid} = $childpid;
703 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
704 Log ($id, "child $childpid started on $childslotname");
705 $Jobstep->{starttime} = time;
706 $Jobstep->{node} = $childnode->{name};
707 $Jobstep->{slotindex} = $childslot;
708 delete $Jobstep->{stderr};
709 delete $Jobstep->{finishtime};
711 splice @jobstep_todo, $todo_ptr, 1;
714 $progress_is_dirty = 1;
718 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
720 last THISROUND if $main::please_freeze;
721 if ($main::please_info)
723 $main::please_info = 0;
727 update_progress_stats();
734 check_refresh_wanted();
736 update_progress_stats();
737 select (undef, undef, undef, 0.1);
739 elsif (time - $progress_stats_updated >= 30)
741 update_progress_stats();
743 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
746 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747 .($thisround_failed+$thisround_succeeded)
748 .") -- giving up on this round";
749 Log (undef, $message);
753 # move slots from freeslot to holdslot (or back to freeslot) if necessary
754 for (my $i=$#freeslot; $i>=0; $i--) {
755 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756 push @holdslot, (splice @freeslot, $i, 1);
759 for (my $i=$#holdslot; $i>=0; $i--) {
760 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761 push @freeslot, (splice @holdslot, $i, 1);
765 # give up if no nodes are succeeding
766 if (!grep { $_->{node}->{losing_streak} == 0 &&
767 $_->{node}->{hold_count} < 4 } @slot) {
768 my $message = "Every node has failed -- giving up on this round";
769 Log (undef, $message);
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
783 if ($main::please_continue) {
784 $main::please_continue = 0;
787 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
791 check_refresh_wanted();
793 update_progress_stats();
794 select (undef, undef, undef, 0.1);
795 killem (keys %proc) if $main::please_freeze;
799 update_progress_stats();
800 freeze_if_want_freeze();
803 if (!defined $main::success)
806 $thisround_succeeded == 0 &&
807 ($thisround_failed == 0 || $thisround_failed > 4))
809 my $message = "stop because $thisround_failed tasks failed and none succeeded";
810 Log (undef, $message);
819 goto ONELEVEL if !defined $main::success;
822 release_allocation();
824 my $collated_output = &collate_output();
827 $Job->update_attributes('running' => 0,
828 'success' => $collated_output && $main::success,
829 'finished_at' => scalar gmtime)
832 if ($collated_output)
835 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
836 or die "failed to get collated manifest: $!";
837 # Read the original manifest, and strip permission hints from it,
838 # so we can put the result in a Collection.
839 my @stripped_manifest_lines = ();
840 my $orig_manifest_text = '';
841 while (my $manifest_line = <$orig_manifest>) {
842 $orig_manifest_text .= $manifest_line;
843 my @words = split(/ /, $manifest_line, -1);
844 foreach my $ii (0..$#words) {
845 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
846 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
849 push(@stripped_manifest_lines, join(" ", @words));
851 my $stripped_manifest_text = join("", @stripped_manifest_lines);
852 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
853 'uuid' => md5_hex($stripped_manifest_text),
854 'manifest_text' => $orig_manifest_text,
856 $Job->update_attributes('output' => $output->{uuid});
857 if ($Job->{'output_is_persistent'}) {
858 $arv->{'links'}->{'create'}->execute('link' => {
859 'tail_kind' => 'arvados#user',
860 'tail_uuid' => $User->{'uuid'},
861 'head_kind' => 'arvados#collection',
862 'head_uuid' => $Job->{'output'},
863 'link_class' => 'resources',
869 Log (undef, "Failed to register output manifest: $@");
873 Log (undef, "finish");
880 sub update_progress_stats
882 $progress_stats_updated = time;
883 return if !$progress_is_dirty;
884 my ($todo, $done, $running) = (scalar @jobstep_todo,
885 scalar @jobstep_done,
886 scalar @slot - scalar @freeslot - scalar @holdslot);
887 $Job->{'tasks_summary'} ||= {};
888 $Job->{'tasks_summary'}->{'todo'} = $todo;
889 $Job->{'tasks_summary'}->{'done'} = $done;
890 $Job->{'tasks_summary'}->{'running'} = $running;
892 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
894 Log (undef, "status: $done done, $running running, $todo todo");
895 $progress_is_dirty = 0;
902 my $pid = waitpid (-1, WNOHANG);
903 return 0 if $pid <= 0;
905 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
907 . $slot[$proc{$pid}->{slot}]->{cpu});
908 my $jobstepid = $proc{$pid}->{jobstep};
909 my $elapsed = time - $proc{$pid}->{time};
910 my $Jobstep = $jobstep[$jobstepid];
912 my $childstatus = $?;
913 my $exitvalue = $childstatus >> 8;
914 my $exitinfo = sprintf("exit %d signal %d%s",
917 ($childstatus & 128 ? ' core dump' : ''));
918 $Jobstep->{'arvados_task'}->reload;
919 my $task_success = $Jobstep->{'arvados_task'}->{success};
921 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
923 if (!defined $task_success) {
924 # task did not indicate one way or the other --> fail
925 $Jobstep->{'arvados_task'}->{success} = 0;
926 $Jobstep->{'arvados_task'}->save;
933 $temporary_fail ||= $Jobstep->{node_fail};
934 $temporary_fail ||= ($exitvalue == 111);
937 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
939 # Check for signs of a failed or misconfigured node
940 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
941 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
942 # Don't count this against jobstep failure thresholds if this
943 # node is already suspected faulty and srun exited quickly
944 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
946 Log ($jobstepid, "blaming failure on suspect node " .
947 $slot[$proc{$pid}->{slot}]->{node}->{name});
948 $temporary_fail ||= 1;
950 ban_node_by_slot($proc{$pid}->{slot});
953 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
954 ++$Jobstep->{'failures'},
955 $temporary_fail ? 'temporary ' : 'permanent',
958 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
959 # Give up on this task, and the whole job
961 $main::please_freeze = 1;
964 # Put this task back on the todo queue
965 push @jobstep_todo, $jobstepid;
967 $Job->{'tasks_summary'}->{'failed'}++;
971 ++$thisround_succeeded;
972 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
973 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
974 push @jobstep_done, $jobstepid;
975 Log ($jobstepid, "success in $elapsed seconds");
977 $Jobstep->{exitcode} = $childstatus;
978 $Jobstep->{finishtime} = time;
979 process_stderr ($jobstepid, $task_success);
980 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
982 close $reader{$jobstepid};
983 delete $reader{$jobstepid};
984 delete $slot[$proc{$pid}->{slot}]->{pid};
985 push @freeslot, $proc{$pid}->{slot};
989 my $newtask_list = [];
992 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
994 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
996 'order' => 'qsequence',
997 'offset' => scalar(@$newtask_list),
999 push(@$newtask_list, @{$newtask_results->{items}});
1000 } while (@{$newtask_results->{items}});
1001 foreach my $arvados_task (@$newtask_list) {
1003 'level' => $arvados_task->{'sequence'},
1005 'arvados_task' => $arvados_task
1007 push @jobstep, $jobstep;
1008 push @jobstep_todo, $#jobstep;
1011 $progress_is_dirty = 1;
1015 sub check_refresh_wanted
1017 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1018 if (@stat && $stat[9] > $latest_refresh) {
1019 $latest_refresh = scalar time;
1020 if ($job_has_uuid) {
1021 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1022 for my $attr ('cancelled_at',
1023 'cancelled_by_user_uuid',
1024 'cancelled_by_client_uuid') {
1025 $Job->{$attr} = $Job2->{$attr};
1027 if ($Job->{'cancelled_at'}) {
1028 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1029 " by user " . $Job->{cancelled_by_user_uuid});
1031 $main::please_freeze = 1;
1039 # return if the kill list was checked <4 seconds ago
1040 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1044 $squeue_kill_checked = time;
1046 # use killem() on procs whose killtime is reached
1049 if (exists $proc{$_}->{killtime}
1050 && $proc{$_}->{killtime} <= time)
1056 # return if the squeue was checked <60 seconds ago
1057 if (defined $squeue_checked && $squeue_checked > time - 60)
1061 $squeue_checked = time;
1065 # here is an opportunity to check for mysterious problems with local procs
1069 # get a list of steps still running
1070 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1072 if ($squeue[-1] ne "ok")
1078 # which of my jobsteps are running, according to squeue?
1082 if (/^(\d+)\.(\d+) (\S+)/)
1084 if ($1 eq $ENV{SLURM_JOBID})
1091 # which of my active child procs (>60s old) were not mentioned by squeue?
1092 foreach (keys %proc)
1094 if ($proc{$_}->{time} < time - 60
1095 && !exists $ok{$proc{$_}->{jobstepname}}
1096 && !exists $proc{$_}->{killtime})
1098 # kill this proc if it hasn't exited in 30 seconds
1099 $proc{$_}->{killtime} = time + 30;
1105 sub release_allocation
1109 Log (undef, "release job allocation");
1110 system "scancel $ENV{SLURM_JOBID}";
1118 foreach my $job (keys %reader)
1121 while (0 < sysread ($reader{$job}, $buf, 8192))
1123 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1124 $jobstep[$job]->{stderr} .= $buf;
1125 preprocess_stderr ($job);
1126 if (length ($jobstep[$job]->{stderr}) > 16384)
1128 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1137 sub preprocess_stderr
1141 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1143 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1144 Log ($job, "stderr $line");
1145 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1147 $main::please_freeze = 1;
1149 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1150 $jobstep[$job]->{node_fail} = 1;
1151 ban_node_by_slot($jobstep[$job]->{slotindex});
1160 my $task_success = shift;
1161 preprocess_stderr ($job);
1164 Log ($job, "stderr $_");
1165 } split ("\n", $jobstep[$job]->{stderr});
1171 my ($keep, $child_out, $output_block);
1173 my $cmd = "arv-get \Q$hash\E";
1174 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1175 sysread($keep, $output_block, 64 * 1024 * 1024);
1177 return $output_block;
1182 Log (undef, "collate");
1184 my ($child_out, $child_in);
1185 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1189 next if (!exists $_->{'arvados_task'}->{output} ||
1190 !$_->{'arvados_task'}->{'success'} ||
1191 $_->{'exitcode'} != 0);
1192 my $output = $_->{'arvados_task'}->{output};
1193 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1195 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1196 print $child_in $output;
1198 elsif (@jobstep == 1)
1200 $joboutput = $output;
1203 elsif (defined (my $outblock = fetch_block ($output)))
1205 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1206 print $child_in $outblock;
1210 Log (undef, "XXX fetch_block($output) failed XXX");
1216 if (!defined $joboutput) {
1217 my $s = IO::Select->new($child_out);
1218 if ($s->can_read(120)) {
1219 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1222 Log (undef, "timed out reading from 'arv-put'");
1229 Log (undef, "output $joboutput");
1230 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1234 Log (undef, "output undef");
1244 my $sig = 2; # SIGINT first
1245 if (exists $proc{$_}->{"sent_$sig"} &&
1246 time - $proc{$_}->{"sent_$sig"} > 4)
1248 $sig = 15; # SIGTERM if SIGINT doesn't work
1250 if (exists $proc{$_}->{"sent_$sig"} &&
1251 time - $proc{$_}->{"sent_$sig"} > 4)
1253 $sig = 9; # SIGKILL if SIGTERM doesn't work
1255 if (!exists $proc{$_}->{"sent_$sig"})
1257 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1259 select (undef, undef, undef, 0.1);
1262 kill $sig, $_; # srun wants two SIGINT to really interrupt
1264 $proc{$_}->{"sent_$sig"} = time;
1265 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1275 vec($bits,fileno($_),1) = 1;
1281 sub Log # ($jobstep_id, $logmessage)
1283 if ($_[1] =~ /\n/) {
1284 for my $line (split (/\n/, $_[1])) {
1289 my $fh = select STDERR; $|=1; select $fh;
1290 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1291 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1294 if ($local_logfile || -t STDERR) {
1295 my @gmtime = gmtime;
1296 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1297 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1299 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1301 if ($local_logfile) {
1302 print $local_logfile $datetime . " " . $message;
1309 my ($package, $file, $line) = caller;
1310 my $message = "@_ at $file line $line\n";
1311 Log (undef, $message);
1312 freeze() if @jobstep_todo;
1313 collate_output() if @jobstep_todo;
1315 save_meta() if $local_logfile;
1322 return if !$job_has_uuid;
1323 $Job->update_attributes('running' => 0,
1325 'finished_at' => scalar gmtime);
1331 my $justcheckpoint = shift; # false if this will be the last meta saved
1332 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1334 $local_logfile->flush;
1335 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1336 . quotemeta($local_logfile->filename);
1337 my $loglocator = `$cmd`;
1338 die "system $cmd failed: $?" if $?;
1341 $local_logfile = undef; # the temp file is automatically deleted
1342 Log (undef, "log manifest is $loglocator");
1343 $Job->{'log'} = $loglocator;
1344 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1348 sub freeze_if_want_freeze
1350 if ($main::please_freeze)
1352 release_allocation();
1355 # kill some srun procs before freeze+stop
1356 map { $proc{$_} = {} } @_;
1359 killem (keys %proc);
1360 select (undef, undef, undef, 0.1);
1362 while (($died = waitpid (-1, WNOHANG)) > 0)
1364 delete $proc{$died};
1379 Log (undef, "Freeze not implemented");
1386 croak ("Thaw not implemented");
1402 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1409 my $srunargs = shift;
1410 my $execargs = shift;
1411 my $opts = shift || {};
1413 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1414 print STDERR (join (" ",
1415 map { / / ? "'$_'" : $_ }
1418 if $ENV{CRUNCH_DEBUG};
1420 if (defined $stdin) {
1421 my $child = open STDIN, "-|";
1422 defined $child or die "no fork: $!";
1424 print $stdin or die $!;
1425 close STDOUT or die $!;
1430 return system (@$args) if $opts->{fork};
1433 warn "ENV size is ".length(join(" ",%ENV));
1434 die "exec failed: $!: @$args";
1438 sub ban_node_by_slot {
1439 # Don't start any new jobsteps on this node for 60 seconds
1441 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1442 $slot[$slotid]->{node}->{hold_count}++;
1443 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1448 my ($lockfile, $error_message) = @_;
1449 open L, ">", $lockfile or croak("$lockfile: $!");
1450 if (!flock L, LOCK_EX|LOCK_NB) {
1451 croak("Can't lock $lockfile: $error_message\n");
1455 sub find_docker_hash {
1456 # Given a Keep locator, search for a matching link to find the Docker hash
1457 # of the stored image.
1458 my $locator = shift;
1459 my $links_result = $arv->{links}->{list}->execute(
1460 filters => [["head_uuid", "=", $locator],
1461 ["link_class", "=", "docker_image_hash"]],
1464 foreach my $link (@{$links_result->{items}}) {
1465 $docker_hash = lc($link->{name});
1467 return $docker_hash;
1473 # checkout-and-build
1477 my $destdir = $ENV{"CRUNCH_SRC"};
1478 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1479 my $repo = $ENV{"CRUNCH_SRC_URL"};
1481 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1483 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1487 unlink "$destdir.commit";
1488 open STDOUT, ">", "$destdir.log";
1489 open STDERR, ">&STDOUT";
1492 my @git_archive_data = <DATA>;
1493 if (@git_archive_data) {
1494 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1495 print TARX @git_archive_data;
1497 die "'tar -C $destdir -xf -' exited $?: $!";
1502 chomp ($pwd = `pwd`);
1503 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1506 for my $src_path ("$destdir/arvados/sdk/python") {
1508 shell_or_die ("virtualenv", $install_dir);
1509 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1513 if (-e "$destdir/crunch_scripts/install") {
1514 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1515 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1517 shell_or_die ("./tests/autotests.sh", $install_dir);
1518 } elsif (-e "./install.sh") {
1519 shell_or_die ("./install.sh", $install_dir);
1523 unlink "$destdir.commit.new";
1524 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1525 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1534 if ($ENV{"DEBUG"}) {
1535 print STDERR "@_\n";
1538 or die "@_ failed: $! exit 0x".sprintf("%x",$?);