2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
85 use File::Path qw( make_path );
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90 if ($ENV{"USER"} ne "crunch" && $< != 0) {
91 # use a tmp dir unique for my uid
92 $ENV{"CRUNCH_TMP"} .= "-$<";
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
112 GetOptions('force-unlock' => \$force_unlock,
113 'git-dir=s' => \$git_dir,
114 'job=s' => \$jobspec,
115 'job-api-token=s' => \$job_api_token,
116 'no-clear-tmp' => \$no_clear_tmp,
117 'resume-stash=s' => \$resume_stash,
120 if (defined $job_api_token) {
121 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
131 $main::ENV{CRUNCH_DEBUG} = 1;
135 $main::ENV{CRUNCH_DEBUG} = 0;
140 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $User = $arv->{'users'}->{'current'}->execute;
151 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152 if (!$force_unlock) {
153 if ($Job->{'is_locked_by_uuid'}) {
154 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
156 if ($Job->{'success'} ne undef) {
157 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
159 if ($Job->{'running'}) {
160 croak("Job 'running' flag is already set");
162 if ($Job->{'started_at'}) {
163 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
169 $Job = JSON::decode_json($jobspec);
173 map { croak ("No $_ specified") unless $Job->{$_} }
174 qw(script script_version script_parameters);
177 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178 $Job->{'started_at'} = gmtime;
180 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
184 $job_id = $Job->{'uuid'};
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194 Log (undef, "check slurm allocation");
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
201 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202 push @sinfo, "$localcpus localhost";
204 if (exists $ENV{SLURM_NODELIST})
206 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
210 my ($ncpus, $slurm_nodelist) = split;
211 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220 foreach (split (",", $ranges))
233 push @nodelist, map {
235 $n =~ s/\[[-,\d]+\]/$_/;
242 push @nodelist, $nodelist;
245 foreach my $nodename (@nodelist)
247 Log (undef, "node $nodename - $ncpus slots");
248 my $node = { name => $nodename,
252 foreach my $cpu (1..$ncpus)
254 push @slot, { node => $node,
258 push @node, @nodelist;
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
273 # Claim this job, and make sure nobody else does
274 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
275 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
276 croak("Error while updating / locking job");
278 $Job->update_attributes('started_at' => scalar gmtime,
281 'tasks_summary' => { 'failed' => 0,
288 Log (undef, "start");
289 $SIG{'INT'} = sub { $main::please_freeze = 1; };
290 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
291 $SIG{'TERM'} = \&croak;
292 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
293 $SIG{'ALRM'} = sub { $main::please_info = 1; };
294 $SIG{'CONT'} = sub { $main::please_continue = 1; };
295 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
297 $main::please_freeze = 0;
298 $main::please_info = 0;
299 $main::please_continue = 0;
300 $main::please_refresh = 0;
301 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
303 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
304 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
305 $ENV{"JOB_UUID"} = $job_id;
309 my @jobstep_todo = ();
310 my @jobstep_done = ();
311 my @jobstep_tomerge = ();
312 my $jobstep_tomerge_level = 0;
314 my $squeue_kill_checked;
315 my $output_in_keep = 0;
316 my $latest_refresh = scalar time;
320 if (defined $Job->{thawedfromkey})
322 thaw ($Job->{thawedfromkey});
326 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
327 'job_uuid' => $Job->{'uuid'},
332 push @jobstep, { 'level' => 0,
334 'arvados_task' => $first_task,
336 push @jobstep_todo, 0;
342 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
349 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
351 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if (!defined $no_clear_tmp) {
355 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
356 system($clear_tmp_cmd) == 0
357 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
359 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
360 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
362 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
363 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
364 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
366 or croak ("setup.py in $src_path failed: exit ".($?>>8));
374 $build_script = <DATA>;
376 Log (undef, "Install revision ".$Job->{script_version});
377 my $nodelist = join(",", @node);
379 if (!defined $no_clear_tmp) {
380 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382 my $cleanpid = fork();
385 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
386 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
391 last if $cleanpid == waitpid (-1, WNOHANG);
392 freeze_if_want_freeze ($cleanpid);
393 select (undef, undef, undef, 0.1);
395 Log (undef, "Clean-work-dir exited $?");
398 # Install requested code version
401 my @srunargs = ("srun",
402 "--nodelist=$nodelist",
403 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
405 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
406 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
410 my $treeish = $Job->{'script_version'};
412 # If we're running under crunch-dispatch, it will have pulled the
413 # appropriate source tree into its own repository, and given us that
414 # repo's path as $git_dir. If we're running a "local" job, and a
415 # script_version was specified, it's up to the user to provide the
416 # full path to a local repository in Job->{repository}.
418 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
419 # git-archive --remote where appropriate.
421 # TODO: Accept a locally-hosted Arvados repository by name or
422 # UUID. Use arvados.v1.repositories.list or .get to figure out the
423 # appropriate fetch-url.
424 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
426 $ENV{"CRUNCH_SRC_URL"} = $repo;
428 if (-d "$repo/.git") {
429 # We were given a working directory, but we are only interested in
431 $repo = "$repo/.git";
434 # If this looks like a subversion r#, look for it in git-svn commit messages
436 if ($treeish =~ m{^\d{1,4}$}) {
437 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
439 if ($gitlog =~ /^[a-f0-9]{40}$/) {
441 Log (undef, "Using commit $commit for script_version $treeish");
445 # If that didn't work, try asking git to look it up as a tree-ish.
447 if (!defined $commit) {
448 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
450 if ($found =~ /^[0-9a-f]{40}$/s) {
452 if ($commit ne $treeish) {
453 # Make sure we record the real commit id in the database,
454 # frozentokey, logs, etc. -- instead of an abbreviation or a
455 # branch name which can become ambiguous or point to a
456 # different commit in the future.
457 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458 Log (undef, "Using commit $commit for tree-ish $treeish");
459 if ($commit ne $treeish) {
460 $Job->{'script_version'} = $commit;
462 $Job->update_attributes('script_version' => $commit) or
463 croak("Error while updating job");
469 if (defined $commit) {
470 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
471 @execargs = ("sh", "-c",
472 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
473 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476 croak ("could not figure out commit id for $treeish");
479 my $installpid = fork();
480 if ($installpid == 0)
482 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
487 last if $installpid == waitpid (-1, WNOHANG);
488 freeze_if_want_freeze ($installpid);
489 select (undef, undef, undef, 0.1);
491 Log (undef, "Install exited $?");
496 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
497 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 # If this job requires a Docker image, install that.
501 my $docker_bin = "/usr/bin/docker.io";
502 my ($docker_locator, $docker_stream, $docker_hash);
503 if ($docker_locator = $Job->{docker_image_locator}) {
504 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
507 croak("No Docker image hash found from locator $docker_locator");
509 $docker_stream =~ s/^\.//;
510 my $docker_install_script = qq{
511 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
512 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
515 my $docker_pid = fork();
516 if ($docker_pid == 0)
518 srun (["srun", "--nodelist=" . join(',', @node)],
519 ["/bin/sh", "-ec", $docker_install_script]);
524 last if $docker_pid == waitpid (-1, WNOHANG);
525 freeze_if_want_freeze ($docker_pid);
526 select (undef, undef, undef, 0.1);
530 croak("Installing Docker image from $docker_locator returned exit code $?");
534 foreach (qw (script script_version script_parameters runtime_constraints))
538 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 foreach (split (/\n/, $Job->{knobs}))
542 Log (undef, "knob " . $_);
547 $main::success = undef;
553 my $thisround_succeeded = 0;
554 my $thisround_failed = 0;
555 my $thisround_failed_multiple = 0;
557 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
558 or $a <=> $b } @jobstep_todo;
559 my $level = $jobstep[$jobstep_todo[0]]->{level};
560 Log (undef, "start level $level");
565 my @freeslot = (0..$#slot);
568 my $progress_is_dirty = 1;
569 my $progress_stats_updated = 0;
571 update_progress_stats();
576 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 my $id = $jobstep_todo[$todo_ptr];
579 my $Jobstep = $jobstep[$id];
580 if ($Jobstep->{level} != $level)
585 pipe $reader{$id}, "writer" or croak ($!);
586 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
587 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589 my $childslot = $freeslot[0];
590 my $childnode = $slot[$childslot]->{node};
591 my $childslotname = join (".",
592 $slot[$childslot]->{node}->{name},
593 $slot[$childslot]->{cpu});
594 my $childpid = fork();
597 $SIG{'INT'} = 'DEFAULT';
598 $SIG{'QUIT'} = 'DEFAULT';
599 $SIG{'TERM'} = 'DEFAULT';
601 foreach (values (%reader))
605 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
606 open(STDOUT,">&writer");
607 open(STDERR,">&writer");
612 delete $ENV{"GNUPGHOME"};
613 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
614 $ENV{"TASK_QSEQUENCE"} = $id;
615 $ENV{"TASK_SEQUENCE"} = $level;
616 $ENV{"JOB_SCRIPT"} = $Job->{script};
617 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
618 $param =~ tr/a-z/A-Z/;
619 $ENV{"JOB_PARAMETER_$param"} = $value;
621 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
622 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
623 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
624 $ENV{"HOME"} = $ENV{"TASK_WORK"};
625 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
626 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
627 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
628 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
634 "--nodelist=".$childnode->{name},
635 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
636 "--job-name=$job_id.$id.$$",
638 my $build_script_to_send = "";
640 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
641 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
642 ."&& cd $ENV{CRUNCH_TMP} ";
645 $build_script_to_send = $build_script;
649 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
652 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
653 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
654 # Dynamically configure the container to use the host system as its
655 # DNS server. Get the host's global addresses from the ip command,
656 # and turn them into docker --dns options using gawk.
658 q{$(ip -o address show scope global |
659 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
660 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
661 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
662 $command .= "--env=\QHOME=/home/crunch\E ";
663 while (my ($env_key, $env_val) = each %ENV)
665 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
666 if ($env_key eq "TASK_WORK") {
667 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
669 elsif ($env_key eq "TASK_KEEPMOUNT") {
670 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
673 $command .= "--env=\Q$env_key=$env_val\E ";
677 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
678 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
679 $command .= "\Q$docker_hash\E ";
680 $command .= "stdbuf --output=0 --error=0 ";
681 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
684 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
685 $command .= "stdbuf --output=0 --error=0 ";
686 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
689 my @execargs = ('bash', '-c', $command);
690 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
694 if (!defined $childpid)
701 $proc{$childpid} = { jobstep => $id,
704 jobstepname => "$job_id.$id.$childpid",
706 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
707 $slot[$childslot]->{pid} = $childpid;
709 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
710 Log ($id, "child $childpid started on $childslotname");
711 $Jobstep->{starttime} = time;
712 $Jobstep->{node} = $childnode->{name};
713 $Jobstep->{slotindex} = $childslot;
714 delete $Jobstep->{stderr};
715 delete $Jobstep->{finishtime};
717 splice @jobstep_todo, $todo_ptr, 1;
720 $progress_is_dirty = 1;
724 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
726 last THISROUND if $main::please_freeze;
727 if ($main::please_info)
729 $main::please_info = 0;
733 update_progress_stats();
740 check_refresh_wanted();
742 update_progress_stats();
743 select (undef, undef, undef, 0.1);
745 elsif (time - $progress_stats_updated >= 30)
747 update_progress_stats();
749 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
750 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
752 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
753 .($thisround_failed+$thisround_succeeded)
754 .") -- giving up on this round";
755 Log (undef, $message);
759 # move slots from freeslot to holdslot (or back to freeslot) if necessary
760 for (my $i=$#freeslot; $i>=0; $i--) {
761 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
762 push @holdslot, (splice @freeslot, $i, 1);
765 for (my $i=$#holdslot; $i>=0; $i--) {
766 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
767 push @freeslot, (splice @holdslot, $i, 1);
771 # give up if no nodes are succeeding
772 if (!grep { $_->{node}->{losing_streak} == 0 &&
773 $_->{node}->{hold_count} < 4 } @slot) {
774 my $message = "Every node has failed -- giving up on this round";
775 Log (undef, $message);
782 push @freeslot, splice @holdslot;
783 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
786 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
789 if ($main::please_continue) {
790 $main::please_continue = 0;
793 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
797 check_refresh_wanted();
799 update_progress_stats();
800 select (undef, undef, undef, 0.1);
801 killem (keys %proc) if $main::please_freeze;
805 update_progress_stats();
806 freeze_if_want_freeze();
809 if (!defined $main::success)
812 $thisround_succeeded == 0 &&
813 ($thisround_failed == 0 || $thisround_failed > 4))
815 my $message = "stop because $thisround_failed tasks failed and none succeeded";
816 Log (undef, $message);
825 goto ONELEVEL if !defined $main::success;
828 release_allocation();
830 my $collated_output = &collate_output();
833 $Job->update_attributes('running' => 0,
834 'success' => $collated_output && $main::success,
835 'finished_at' => scalar gmtime)
838 if (!$collated_output) {
839 Log(undef, "output undef");
843 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
844 or die "failed to get collated manifest: $!";
845 # Read the original manifest, and strip permission hints from it,
846 # so we can put the result in a Collection.
847 my @stripped_manifest_lines = ();
848 my $orig_manifest_text = '';
849 while (my $manifest_line = <$orig_manifest>) {
850 $orig_manifest_text .= $manifest_line;
851 my @words = split(/ /, $manifest_line, -1);
852 foreach my $ii (0..$#words) {
853 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
854 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
857 push(@stripped_manifest_lines, join(" ", @words));
859 my $stripped_manifest_text = join("", @stripped_manifest_lines);
860 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
861 'uuid' => md5_hex($stripped_manifest_text),
862 'manifest_text' => $orig_manifest_text,
864 Log(undef, "output " . $output->{uuid});
865 $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
866 if ($Job->{'output_is_persistent'}) {
867 $arv->{'links'}->{'create'}->execute('link' => {
868 'tail_kind' => 'arvados#user',
869 'tail_uuid' => $User->{'uuid'},
870 'head_kind' => 'arvados#collection',
871 'head_uuid' => $Job->{'output'},
872 'link_class' => 'resources',
878 Log (undef, "Failed to register output manifest: $@");
882 Log (undef, "finish");
889 sub update_progress_stats
891 $progress_stats_updated = time;
892 return if !$progress_is_dirty;
893 my ($todo, $done, $running) = (scalar @jobstep_todo,
894 scalar @jobstep_done,
895 scalar @slot - scalar @freeslot - scalar @holdslot);
896 $Job->{'tasks_summary'} ||= {};
897 $Job->{'tasks_summary'}->{'todo'} = $todo;
898 $Job->{'tasks_summary'}->{'done'} = $done;
899 $Job->{'tasks_summary'}->{'running'} = $running;
901 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
903 Log (undef, "status: $done done, $running running, $todo todo");
904 $progress_is_dirty = 0;
911 my $pid = waitpid (-1, WNOHANG);
912 return 0 if $pid <= 0;
914 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
916 . $slot[$proc{$pid}->{slot}]->{cpu});
917 my $jobstepid = $proc{$pid}->{jobstep};
918 my $elapsed = time - $proc{$pid}->{time};
919 my $Jobstep = $jobstep[$jobstepid];
921 my $childstatus = $?;
922 my $exitvalue = $childstatus >> 8;
923 my $exitinfo = sprintf("exit %d signal %d%s",
926 ($childstatus & 128 ? ' core dump' : ''));
927 $Jobstep->{'arvados_task'}->reload;
928 my $task_success = $Jobstep->{'arvados_task'}->{success};
930 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
932 if (!defined $task_success) {
933 # task did not indicate one way or the other --> fail
934 $Jobstep->{'arvados_task'}->{success} = 0;
935 $Jobstep->{'arvados_task'}->save;
942 $temporary_fail ||= $Jobstep->{node_fail};
943 $temporary_fail ||= ($exitvalue == 111);
946 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
948 # Check for signs of a failed or misconfigured node
949 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
950 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
951 # Don't count this against jobstep failure thresholds if this
952 # node is already suspected faulty and srun exited quickly
953 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
955 Log ($jobstepid, "blaming failure on suspect node " .
956 $slot[$proc{$pid}->{slot}]->{node}->{name});
957 $temporary_fail ||= 1;
959 ban_node_by_slot($proc{$pid}->{slot});
962 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
963 ++$Jobstep->{'failures'},
964 $temporary_fail ? 'temporary ' : 'permanent',
967 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
968 # Give up on this task, and the whole job
970 $main::please_freeze = 1;
973 # Put this task back on the todo queue
974 push @jobstep_todo, $jobstepid;
976 $Job->{'tasks_summary'}->{'failed'}++;
980 ++$thisround_succeeded;
981 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
982 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
983 push @jobstep_done, $jobstepid;
984 Log ($jobstepid, "success in $elapsed seconds");
986 $Jobstep->{exitcode} = $childstatus;
987 $Jobstep->{finishtime} = time;
988 process_stderr ($jobstepid, $task_success);
989 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
991 close $reader{$jobstepid};
992 delete $reader{$jobstepid};
993 delete $slot[$proc{$pid}->{slot}]->{pid};
994 push @freeslot, $proc{$pid}->{slot};
999 my $newtask_list = [];
1000 my $newtask_results;
1002 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1004 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1006 'order' => 'qsequence',
1007 'offset' => scalar(@$newtask_list),
1009 push(@$newtask_list, @{$newtask_results->{items}});
1010 } while (@{$newtask_results->{items}});
1011 foreach my $arvados_task (@$newtask_list) {
1013 'level' => $arvados_task->{'sequence'},
1015 'arvados_task' => $arvados_task
1017 push @jobstep, $jobstep;
1018 push @jobstep_todo, $#jobstep;
1022 $progress_is_dirty = 1;
1026 sub check_refresh_wanted
1028 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1029 if (@stat && $stat[9] > $latest_refresh) {
1030 $latest_refresh = scalar time;
1031 if ($job_has_uuid) {
1032 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1033 for my $attr ('cancelled_at',
1034 'cancelled_by_user_uuid',
1035 'cancelled_by_client_uuid') {
1036 $Job->{$attr} = $Job2->{$attr};
1038 if ($Job->{'cancelled_at'}) {
1039 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1040 " by user " . $Job->{cancelled_by_user_uuid});
1042 $main::please_freeze = 1;
1050 # return if the kill list was checked <4 seconds ago
1051 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1055 $squeue_kill_checked = time;
1057 # use killem() on procs whose killtime is reached
1060 if (exists $proc{$_}->{killtime}
1061 && $proc{$_}->{killtime} <= time)
1067 # return if the squeue was checked <60 seconds ago
1068 if (defined $squeue_checked && $squeue_checked > time - 60)
1072 $squeue_checked = time;
1076 # here is an opportunity to check for mysterious problems with local procs
1080 # get a list of steps still running
1081 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1083 if ($squeue[-1] ne "ok")
1089 # which of my jobsteps are running, according to squeue?
1093 if (/^(\d+)\.(\d+) (\S+)/)
1095 if ($1 eq $ENV{SLURM_JOBID})
1102 # which of my active child procs (>60s old) were not mentioned by squeue?
1103 foreach (keys %proc)
1105 if ($proc{$_}->{time} < time - 60
1106 && !exists $ok{$proc{$_}->{jobstepname}}
1107 && !exists $proc{$_}->{killtime})
1109 # kill this proc if it hasn't exited in 30 seconds
1110 $proc{$_}->{killtime} = time + 30;
1116 sub release_allocation
1120 Log (undef, "release job allocation");
1121 system "scancel $ENV{SLURM_JOBID}";
1129 foreach my $job (keys %reader)
1132 while (0 < sysread ($reader{$job}, $buf, 8192))
1134 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1135 $jobstep[$job]->{stderr} .= $buf;
1136 preprocess_stderr ($job);
1137 if (length ($jobstep[$job]->{stderr}) > 16384)
1139 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1148 sub preprocess_stderr
1152 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1154 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1155 Log ($job, "stderr $line");
1156 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1158 $main::please_freeze = 1;
1160 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1161 $jobstep[$job]->{node_fail} = 1;
1162 ban_node_by_slot($jobstep[$job]->{slotindex});
1171 my $task_success = shift;
1172 preprocess_stderr ($job);
1175 Log ($job, "stderr $_");
1176 } split ("\n", $jobstep[$job]->{stderr});
1182 my ($keep, $child_out, $output_block);
1184 my $cmd = "arv-get \Q$hash\E";
1185 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1189 my $bytes = sysread($keep, $buf, 1024 * 1024);
1190 if (!defined $bytes) {
1191 die "reading from arv-get: $!";
1192 } elsif ($bytes == 0) {
1193 # sysread returns 0 at the end of the pipe.
1196 # some bytes were read into buf.
1197 $output_block .= $buf;
1201 return $output_block;
1206 Log (undef, "collate");
1208 my ($child_out, $child_in);
1209 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1213 next if (!exists $_->{'arvados_task'}->{output} ||
1214 !$_->{'arvados_task'}->{'success'} ||
1215 $_->{'exitcode'} != 0);
1216 my $output = $_->{'arvados_task'}->{output};
1217 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1219 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1220 print $child_in $output;
1222 elsif (@jobstep == 1)
1224 $joboutput = $output;
1227 elsif (defined (my $outblock = fetch_block ($output)))
1229 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1230 print $child_in $outblock;
1234 Log (undef, "XXX fetch_block($output) failed XXX");
1240 if (!defined $joboutput) {
1241 my $s = IO::Select->new($child_out);
1242 if ($s->can_read(120)) {
1243 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1246 Log (undef, "timed out reading from 'arv-put'");
1259 my $sig = 2; # SIGINT first
1260 if (exists $proc{$_}->{"sent_$sig"} &&
1261 time - $proc{$_}->{"sent_$sig"} > 4)
1263 $sig = 15; # SIGTERM if SIGINT doesn't work
1265 if (exists $proc{$_}->{"sent_$sig"} &&
1266 time - $proc{$_}->{"sent_$sig"} > 4)
1268 $sig = 9; # SIGKILL if SIGTERM doesn't work
1270 if (!exists $proc{$_}->{"sent_$sig"})
1272 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1274 select (undef, undef, undef, 0.1);
1277 kill $sig, $_; # srun wants two SIGINT to really interrupt
1279 $proc{$_}->{"sent_$sig"} = time;
1280 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1290 vec($bits,fileno($_),1) = 1;
1296 sub Log # ($jobstep_id, $logmessage)
1298 if ($_[1] =~ /\n/) {
1299 for my $line (split (/\n/, $_[1])) {
1304 my $fh = select STDERR; $|=1; select $fh;
1305 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1306 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1309 if ($local_logfile || -t STDERR) {
1310 my @gmtime = gmtime;
1311 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1312 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1314 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1316 if ($local_logfile) {
1317 print $local_logfile $datetime . " " . $message;
1324 my ($package, $file, $line) = caller;
1325 my $message = "@_ at $file line $line\n";
1326 Log (undef, $message);
1327 freeze() if @jobstep_todo;
1328 collate_output() if @jobstep_todo;
1330 save_meta() if $local_logfile;
1337 return if !$job_has_uuid;
1338 $Job->update_attributes('running' => 0,
1340 'finished_at' => scalar gmtime);
1346 my $justcheckpoint = shift; # false if this will be the last meta saved
1347 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1349 $local_logfile->flush;
1350 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1351 . quotemeta($local_logfile->filename);
1352 my $loglocator = `$cmd`;
1353 die "system $cmd failed: $?" if $?;
1356 $local_logfile = undef; # the temp file is automatically deleted
1357 Log (undef, "log manifest is $loglocator");
1358 $Job->{'log'} = $loglocator;
1359 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1363 sub freeze_if_want_freeze
1365 if ($main::please_freeze)
1367 release_allocation();
1370 # kill some srun procs before freeze+stop
1371 map { $proc{$_} = {} } @_;
1374 killem (keys %proc);
1375 select (undef, undef, undef, 0.1);
1377 while (($died = waitpid (-1, WNOHANG)) > 0)
1379 delete $proc{$died};
1394 Log (undef, "Freeze not implemented");
1401 croak ("Thaw not implemented");
1417 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1424 my $srunargs = shift;
1425 my $execargs = shift;
1426 my $opts = shift || {};
1428 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1429 print STDERR (join (" ",
1430 map { / / ? "'$_'" : $_ }
1433 if $ENV{CRUNCH_DEBUG};
1435 if (defined $stdin) {
1436 my $child = open STDIN, "-|";
1437 defined $child or die "no fork: $!";
1439 print $stdin or die $!;
1440 close STDOUT or die $!;
1445 return system (@$args) if $opts->{fork};
1448 warn "ENV size is ".length(join(" ",%ENV));
1449 die "exec failed: $!: @$args";
1453 sub ban_node_by_slot {
1454 # Don't start any new jobsteps on this node for 60 seconds
1456 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1457 $slot[$slotid]->{node}->{hold_count}++;
1458 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1463 my ($lockfile, $error_message) = @_;
1464 open L, ">", $lockfile or croak("$lockfile: $!");
1465 if (!flock L, LOCK_EX|LOCK_NB) {
1466 croak("Can't lock $lockfile: $error_message\n");
1470 sub find_docker_image {
1471 # Given a Keep locator, check to see if it contains a Docker image.
1472 # If so, return its stream name and Docker hash.
1473 # If not, return undef for both values.
1474 my $locator = shift;
1475 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1476 my @file_list = @{$image->{files}};
1477 if ((scalar(@file_list) == 1) &&
1478 ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1479 return ($file_list[0][0], $1);
1482 return (undef, undef);
1488 # checkout-and-build
1492 my $destdir = $ENV{"CRUNCH_SRC"};
1493 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1494 my $repo = $ENV{"CRUNCH_SRC_URL"};
1496 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1498 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1502 unlink "$destdir.commit";
1503 open STDOUT, ">", "$destdir.log";
1504 open STDERR, ">&STDOUT";
1507 my @git_archive_data = <DATA>;
1508 if (@git_archive_data) {
1509 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1510 print TARX @git_archive_data;
1512 die "'tar -C $destdir -xf -' exited $?: $!";
1517 chomp ($pwd = `pwd`);
1518 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1521 for my $src_path ("$destdir/arvados/sdk/python") {
1523 shell_or_die ("virtualenv", $install_dir);
1524 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1528 if (-e "$destdir/crunch_scripts/install") {
1529 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1530 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1532 shell_or_die ("./tests/autotests.sh", $install_dir);
1533 } elsif (-e "./install.sh") {
1534 shell_or_die ("./install.sh", $install_dir);
1538 unlink "$destdir.commit.new";
1539 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1540 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1549 if ($ENV{"DEBUG"}) {
1550 print STDERR "@_\n";
1553 or die "@_ failed: $! exit 0x".sprintf("%x",$?);