2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
90 use Digest::MD5 qw(md5_hex);
96 use File::Path qw( make_path remove_tree );
98 use constant EX_TEMPFAIL => 75;
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103 if ($ENV{"USER"} ne "crunch" && $< != 0) {
104 # use a tmp dir unique for my uid
105 $ENV{"CRUNCH_TMP"} .= "-$<";
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
125 GetOptions('force-unlock' => \$force_unlock,
126 'git-dir=s' => \$git_dir,
127 'job=s' => \$jobspec,
128 'job-api-token=s' => \$job_api_token,
129 'no-clear-tmp' => \$no_clear_tmp,
130 'resume-stash=s' => \$resume_stash,
133 if (defined $job_api_token) {
134 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
143 $main::ENV{CRUNCH_DEBUG} = 1;
147 $main::ENV{CRUNCH_DEBUG} = 0;
152 my $arv = Arvados->new('apiVersion' => 'v1');
160 my $User = api_call("users/current");
162 if ($jobspec =~ /^[-a-z\d]+$/)
164 # $jobspec is an Arvados UUID, not a JSON job specification
165 $Job = api_call("jobs/get", uuid => $jobspec);
166 if (!$force_unlock) {
167 # Claim this job, and make sure nobody else does
168 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
170 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
187 $Job->{'state'} = 'Running';
189 $Job = api_call("jobs/create", job => $Job);
191 $job_id = $Job->{'uuid'};
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 Log (undef, "check slurm allocation");
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209 push @sinfo, "$localcpus localhost";
211 if (exists $ENV{SLURM_NODELIST})
213 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 my ($ncpus, $slurm_nodelist) = split;
218 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
221 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
224 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
227 foreach (split (",", $ranges))
240 push @nodelist, map {
242 $n =~ s/\[[-,\d]+\]/$_/;
249 push @nodelist, $nodelist;
252 foreach my $nodename (@nodelist)
254 Log (undef, "node $nodename - $ncpus slots");
255 my $node = { name => $nodename,
259 foreach my $cpu (1..$ncpus)
261 push @slot, { node => $node,
265 push @node, @nodelist;
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
276 $Job->update_attributes(
277 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
313 if (defined $Job->{thawedfromkey})
315 thaw ($Job->{thawedfromkey});
319 my $first_task = api_call("job_tasks/create", job_task => {
320 'job_uuid' => $Job->{'uuid'},
325 push @jobstep, { 'level' => 0,
327 'arvados_task' => $first_task,
329 push @jobstep_todo, 0;
335 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 $build_script = <DATA>;
344 my $nodelist = join(",", @node);
346 if (!defined $no_clear_tmp) {
347 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
348 Log (undef, "Clean work dirs");
350 my $cleanpid = fork();
353 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
354 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
359 last if $cleanpid == waitpid (-1, WNOHANG);
360 freeze_if_want_freeze ($cleanpid);
361 select (undef, undef, undef, 0.1);
363 Log (undef, "Cleanup command exited ".exit_status_s($?));
368 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
369 # If script_version looks like an absolute path, *and* the --git-dir
370 # argument was not given -- which implies we were not invoked by
371 # crunch-dispatch -- we will use the given path as a working
372 # directory instead of resolving script_version to a git commit (or
373 # doing anything else with git).
374 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
378 # Resolve the given script_version to a git commit sha1. Also, if
379 # the repository is remote, clone it into our local filesystem: this
380 # ensures "git archive" will work, and is necessary to reliably
381 # resolve a symbolic script_version like "master^".
382 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
384 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
386 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
388 # If we're running under crunch-dispatch, it will have already
389 # pulled the appropriate source tree into its own repository, and
390 # given us that repo's path as $git_dir.
392 # If we're running a "local" job, we might have to fetch content
393 # from a remote repository.
395 # (Currently crunch-dispatch gives a local path with --git-dir, but
396 # we might as well accept URLs there too in case it changes its
398 my $repo = $git_dir || $Job->{'repository'};
400 # Repository can be remote or local. If remote, we'll need to fetch it
401 # to a local dir before doing `git log` et al.
404 if ($repo =~ m{://|^[^/]*:}) {
405 # $repo is a git url we can clone, like git:// or https:// or
406 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
407 # not recognized here because distinguishing that from a local
408 # path is too fragile. If you really need something strange here,
409 # use the ssh:// form.
410 $repo_location = 'remote';
411 } elsif ($repo =~ m{^\.*/}) {
412 # $repo is a local path to a git index. We'll also resolve ../foo
413 # to ../foo/.git if the latter is a directory. To help
414 # disambiguate local paths from named hosted repositories, this
415 # form must be given as ./ or ../ if it's a relative path.
416 if (-d "$repo/.git") {
417 $repo = "$repo/.git";
419 $repo_location = 'local';
421 # $repo is none of the above. It must be the name of a hosted
423 my $arv_repo_list = api_call("repositories/list",
424 'filters' => [['name','=',$repo]]);
425 my @repos_found = @{$arv_repo_list->{'items'}};
426 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
428 Log(undef, "Repository '$repo' -> "
429 . join(", ", map { $_->{'uuid'} } @repos_found));
432 croak("Error: Found $n_found repositories with name '$repo'.");
434 $repo = $repos_found[0]->{'fetch_url'};
435 $repo_location = 'remote';
437 Log(undef, "Using $repo_location repository '$repo'");
438 $ENV{"CRUNCH_SRC_URL"} = $repo;
440 # Resolve given script_version (we'll call that $treeish here) to a
441 # commit sha1 ($commit).
442 my $treeish = $Job->{'script_version'};
444 if ($repo_location eq 'remote') {
445 # We minimize excess object-fetching by re-using the same bare
446 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
447 # just keep adding remotes to it as needed.
448 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
449 my $gitcmd = "git --git-dir=\Q$local_repo\E";
451 # Set up our local repo for caching remote objects, making
453 if (!-d $local_repo) {
454 make_path($local_repo) or croak("Error: could not create $local_repo");
456 # This works (exits 0 and doesn't delete fetched objects) even
457 # if $local_repo is already initialized:
458 `$gitcmd init --bare`;
460 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
463 # If $treeish looks like a hash (or abbrev hash) we look it up in
464 # our local cache first, since that's cheaper. (We don't want to
465 # do that with tags/branches though -- those change over time, so
466 # they should always be resolved by the remote repo.)
467 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
468 # Hide stderr because it's normal for this to fail:
469 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
471 # Careful not to resolve a branch named abcdeff to commit 1234567:
472 $sha1 =~ /^$treeish/ &&
473 $sha1 =~ /^([0-9a-f]{40})$/s) {
475 Log(undef, "Commit $commit already present in $local_repo");
479 if (!defined $commit) {
480 # If $treeish isn't just a hash or abbrev hash, or isn't here
481 # yet, we need to fetch the remote to resolve it correctly.
483 # First, remove all local heads. This prevents a name that does
484 # not exist on the remote from resolving to (or colliding with)
485 # a previously fetched branch or tag (possibly from a different
487 remove_tree("$local_repo/refs/heads", {keep_root => 1});
489 Log(undef, "Fetching objects from $repo to $local_repo");
490 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
492 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
496 # Now that the data is all here, we will use our local repo for
497 # the rest of our git activities.
501 my $gitcmd = "git --git-dir=\Q$repo\E";
502 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
503 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
504 croak("`$gitcmd rev-list` exited "
506 .", '$treeish' not found. Giving up.");
509 Log(undef, "Version $treeish is commit $commit");
511 if ($commit ne $Job->{'script_version'}) {
512 # Record the real commit id in the database, frozentokey, logs,
513 # etc. -- instead of an abbreviation or a branch name which can
514 # become ambiguous or point to a different commit in the future.
515 if (!$Job->update_attributes('script_version' => $commit)) {
516 croak("Error: failed to update job's script_version attribute");
520 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
521 $git_archive = `$gitcmd archive ''\Q$commit\E`;
523 croak("Error: $gitcmd archive exited ".exit_status_s($?));
527 if (!defined $git_archive) {
528 Log(undef, "Skip install phase (no git archive)");
530 Log(undef, "Warning: This probably means workers have no source tree!");
534 Log(undef, "Run install script on all workers");
536 my @srunargs = ("srun",
537 "--nodelist=$nodelist",
538 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
539 my @execargs = ("sh", "-c",
540 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
542 my $installpid = fork();
543 if ($installpid == 0)
545 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
550 last if $installpid == waitpid (-1, WNOHANG);
551 freeze_if_want_freeze ($installpid);
552 select (undef, undef, undef, 0.1);
554 Log (undef, "Install script exited ".exit_status_s($?));
559 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
560 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
563 # If this job requires a Docker image, install that.
564 my $docker_bin = "/usr/bin/docker.io";
565 my ($docker_locator, $docker_stream, $docker_hash);
566 if ($docker_locator = $Job->{docker_image_locator}) {
567 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
570 croak("No Docker image hash found from locator $docker_locator");
572 $docker_stream =~ s/^\.//;
573 my $docker_install_script = qq{
574 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
575 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
578 my $docker_pid = fork();
579 if ($docker_pid == 0)
581 srun (["srun", "--nodelist=" . join(',', @node)],
582 ["/bin/sh", "-ec", $docker_install_script]);
587 last if $docker_pid == waitpid (-1, WNOHANG);
588 freeze_if_want_freeze ($docker_pid);
589 select (undef, undef, undef, 0.1);
593 croak("Installing Docker image from $docker_locator exited "
598 foreach (qw (script script_version script_parameters runtime_constraints))
602 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
604 foreach (split (/\n/, $Job->{knobs}))
606 Log (undef, "knob " . $_);
611 $main::success = undef;
617 my $thisround_succeeded = 0;
618 my $thisround_failed = 0;
619 my $thisround_failed_multiple = 0;
621 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
622 or $a <=> $b } @jobstep_todo;
623 my $level = $jobstep[$jobstep_todo[0]]->{level};
624 Log (undef, "start level $level");
629 my @freeslot = (0..$#slot);
632 my $progress_is_dirty = 1;
633 my $progress_stats_updated = 0;
635 update_progress_stats();
640 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
642 my $id = $jobstep_todo[$todo_ptr];
643 my $Jobstep = $jobstep[$id];
644 if ($Jobstep->{level} != $level)
649 pipe $reader{$id}, "writer" or croak ($!);
650 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
651 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
653 my $childslot = $freeslot[0];
654 my $childnode = $slot[$childslot]->{node};
655 my $childslotname = join (".",
656 $slot[$childslot]->{node}->{name},
657 $slot[$childslot]->{cpu});
658 my $childpid = fork();
661 $SIG{'INT'} = 'DEFAULT';
662 $SIG{'QUIT'} = 'DEFAULT';
663 $SIG{'TERM'} = 'DEFAULT';
665 foreach (values (%reader))
669 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
670 open(STDOUT,">&writer");
671 open(STDERR,">&writer");
676 delete $ENV{"GNUPGHOME"};
677 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
678 $ENV{"TASK_QSEQUENCE"} = $id;
679 $ENV{"TASK_SEQUENCE"} = $level;
680 $ENV{"JOB_SCRIPT"} = $Job->{script};
681 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
682 $param =~ tr/a-z/A-Z/;
683 $ENV{"JOB_PARAMETER_$param"} = $value;
685 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
686 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
687 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
688 $ENV{"HOME"} = $ENV{"TASK_WORK"};
689 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
690 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
691 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
692 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
698 "--nodelist=".$childnode->{name},
699 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
700 "--job-name=$job_id.$id.$$",
702 my $build_script_to_send = "";
704 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706 ."&& cd $ENV{CRUNCH_TMP} ";
709 $build_script_to_send = $build_script;
713 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
716 my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
717 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
718 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
720 # Dynamically configure the container to use the host system as its
721 # DNS server. Get the host's global addresses from the ip command,
722 # and turn them into docker --dns options using gawk.
724 q{$(ip -o address show scope global |
725 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
727 # The source tree and $destdir directory (which we have
728 # installed on the worker host) are available in the container,
729 # under the same path.
730 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
731 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
733 # Currently, we make arv-mount's mount point appear at /keep
734 # inside the container (instead of using the same path as the
735 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
736 # crunch scripts and utilities must not rely on this. They must
737 # use $TASK_KEEPMOUNT.
738 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
739 $ENV{TASK_KEEPMOUNT} = "/keep";
741 # TASK_WORK is a plain docker data volume: it starts out empty,
742 # is writable, and persists until no containers use it any
743 # more. We don't use --volumes-from to share it with other
744 # containers: it is only accessible to this task, and it goes
745 # away when this task stops.
746 $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
748 # JOB_WORK is also a plain docker data volume for now. TODO:
749 # Share a single JOB_WORK volume across all task containers on a
750 # given worker node, and delete it when the job ends (and, in
751 # case that doesn't work, when the next job starts).
752 $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
754 while (my ($env_key, $env_val) = each %ENV)
756 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
757 $command .= "--env=\Q$env_key=$env_val\E ";
760 $command .= "--env=\QHOME=$ENV{HOME}\E ";
761 $command .= "\Q$docker_hash\E ";
762 $command .= "stdbuf --output=0 --error=0 ";
763 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
766 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
767 $command .= "stdbuf --output=0 --error=0 ";
768 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
771 my @execargs = ('bash', '-c', $command);
772 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
773 # exec() failed, we assume nothing happened.
774 die "srun() failed on build script\n";
777 if (!defined $childpid)
784 $proc{$childpid} = { jobstep => $id,
787 jobstepname => "$job_id.$id.$childpid",
789 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
790 $slot[$childslot]->{pid} = $childpid;
792 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
793 Log ($id, "child $childpid started on $childslotname");
794 $Jobstep->{starttime} = time;
795 $Jobstep->{node} = $childnode->{name};
796 $Jobstep->{slotindex} = $childslot;
797 delete $Jobstep->{stderr};
798 delete $Jobstep->{finishtime};
800 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
801 $Jobstep->{'arvados_task'}->save;
803 splice @jobstep_todo, $todo_ptr, 1;
806 $progress_is_dirty = 1;
810 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
812 last THISROUND if $main::please_freeze;
813 if ($main::please_info)
815 $main::please_info = 0;
819 update_progress_stats();
826 check_refresh_wanted();
828 update_progress_stats();
829 select (undef, undef, undef, 0.1);
831 elsif (time - $progress_stats_updated >= 30)
833 update_progress_stats();
835 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
836 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
838 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
839 .($thisround_failed+$thisround_succeeded)
840 .") -- giving up on this round";
841 Log (undef, $message);
845 # move slots from freeslot to holdslot (or back to freeslot) if necessary
846 for (my $i=$#freeslot; $i>=0; $i--) {
847 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
848 push @holdslot, (splice @freeslot, $i, 1);
851 for (my $i=$#holdslot; $i>=0; $i--) {
852 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
853 push @freeslot, (splice @holdslot, $i, 1);
857 # give up if no nodes are succeeding
858 if (!grep { $_->{node}->{losing_streak} == 0 &&
859 $_->{node}->{hold_count} < 4 } @slot) {
860 my $message = "Every node has failed -- giving up on this round";
861 Log (undef, $message);
868 push @freeslot, splice @holdslot;
869 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
872 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
875 if ($main::please_continue) {
876 $main::please_continue = 0;
879 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
883 check_refresh_wanted();
885 update_progress_stats();
886 select (undef, undef, undef, 0.1);
887 killem (keys %proc) if $main::please_freeze;
891 update_progress_stats();
892 freeze_if_want_freeze();
895 if (!defined $main::success)
898 $thisround_succeeded == 0 &&
899 ($thisround_failed == 0 || $thisround_failed > 4))
901 my $message = "stop because $thisround_failed tasks failed and none succeeded";
902 Log (undef, $message);
911 goto ONELEVEL if !defined $main::success;
914 release_allocation();
916 my $collated_output = &collate_output();
918 if (!$collated_output) {
919 Log(undef, "output undef");
923 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
924 or die "failed to get collated manifest: $!";
925 my $orig_manifest_text = '';
926 while (my $manifest_line = <$orig_manifest>) {
927 $orig_manifest_text .= $manifest_line;
929 my $output = api_call("collections/create", collection => {
930 'manifest_text' => $orig_manifest_text});
931 Log(undef, "output uuid " . $output->{uuid});
932 Log(undef, "output hash " . $output->{portable_data_hash});
933 $Job->update_attributes('output' => $output->{portable_data_hash});
936 Log (undef, "Failed to register output manifest: $@");
940 Log (undef, "finish");
945 if ($collated_output && $main::success) {
946 $final_state = 'Complete';
948 $final_state = 'Failed';
950 $Job->update_attributes('state' => $final_state);
952 exit (($final_state eq 'Complete') ? 0 : 1);
956 sub update_progress_stats
958 $progress_stats_updated = time;
959 return if !$progress_is_dirty;
960 my ($todo, $done, $running) = (scalar @jobstep_todo,
961 scalar @jobstep_done,
962 scalar @slot - scalar @freeslot - scalar @holdslot);
963 $Job->{'tasks_summary'} ||= {};
964 $Job->{'tasks_summary'}->{'todo'} = $todo;
965 $Job->{'tasks_summary'}->{'done'} = $done;
966 $Job->{'tasks_summary'}->{'running'} = $running;
967 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
968 Log (undef, "status: $done done, $running running, $todo todo");
969 $progress_is_dirty = 0;
976 my $pid = waitpid (-1, WNOHANG);
977 return 0 if $pid <= 0;
979 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
981 . $slot[$proc{$pid}->{slot}]->{cpu});
982 my $jobstepid = $proc{$pid}->{jobstep};
983 my $elapsed = time - $proc{$pid}->{time};
984 my $Jobstep = $jobstep[$jobstepid];
986 my $childstatus = $?;
987 my $exitvalue = $childstatus >> 8;
988 my $exitinfo = "exit ".exit_status_s($childstatus);
989 $Jobstep->{'arvados_task'}->reload;
990 my $task_success = $Jobstep->{'arvados_task'}->{success};
992 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
994 if (!defined $task_success) {
995 # task did not indicate one way or the other --> fail
996 $Jobstep->{'arvados_task'}->{success} = 0;
997 $Jobstep->{'arvados_task'}->save;
1004 $temporary_fail ||= $Jobstep->{node_fail};
1005 $temporary_fail ||= ($exitvalue == 111);
1007 ++$thisround_failed;
1008 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1010 # Check for signs of a failed or misconfigured node
1011 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1012 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1013 # Don't count this against jobstep failure thresholds if this
1014 # node is already suspected faulty and srun exited quickly
1015 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1017 Log ($jobstepid, "blaming failure on suspect node " .
1018 $slot[$proc{$pid}->{slot}]->{node}->{name});
1019 $temporary_fail ||= 1;
1021 ban_node_by_slot($proc{$pid}->{slot});
1024 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1025 ++$Jobstep->{'failures'},
1026 $temporary_fail ? 'temporary ' : 'permanent',
1029 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1030 # Give up on this task, and the whole job
1032 $main::please_freeze = 1;
1034 # Put this task back on the todo queue
1035 push @jobstep_todo, $jobstepid;
1036 $Job->{'tasks_summary'}->{'failed'}++;
1040 ++$thisround_succeeded;
1041 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1042 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1043 push @jobstep_done, $jobstepid;
1044 Log ($jobstepid, "success in $elapsed seconds");
1046 $Jobstep->{exitcode} = $childstatus;
1047 $Jobstep->{finishtime} = time;
1048 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1049 $Jobstep->{'arvados_task'}->save;
1050 process_stderr ($jobstepid, $task_success);
1051 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1053 close $reader{$jobstepid};
1054 delete $reader{$jobstepid};
1055 delete $slot[$proc{$pid}->{slot}]->{pid};
1056 push @freeslot, $proc{$pid}->{slot};
1059 if ($task_success) {
1061 my $newtask_list = [];
1062 my $newtask_results;
1064 $newtask_results = api_call(
1067 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1069 'order' => 'qsequence',
1070 'offset' => scalar(@$newtask_list),
1072 push(@$newtask_list, @{$newtask_results->{items}});
1073 } while (@{$newtask_results->{items}});
1074 foreach my $arvados_task (@$newtask_list) {
1076 'level' => $arvados_task->{'sequence'},
1078 'arvados_task' => $arvados_task
1080 push @jobstep, $jobstep;
1081 push @jobstep_todo, $#jobstep;
1085 $progress_is_dirty = 1;
1089 sub check_refresh_wanted
1091 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1092 if (@stat && $stat[9] > $latest_refresh) {
1093 $latest_refresh = scalar time;
1094 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1095 for my $attr ('cancelled_at',
1096 'cancelled_by_user_uuid',
1097 'cancelled_by_client_uuid',
1099 $Job->{$attr} = $Job2->{$attr};
1101 if ($Job->{'state'} ne "Running") {
1102 if ($Job->{'state'} eq "Cancelled") {
1103 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1105 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1108 $main::please_freeze = 1;
1115 # return if the kill list was checked <4 seconds ago
1116 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1120 $squeue_kill_checked = time;
1122 # use killem() on procs whose killtime is reached
1125 if (exists $proc{$_}->{killtime}
1126 && $proc{$_}->{killtime} <= time)
1132 # return if the squeue was checked <60 seconds ago
1133 if (defined $squeue_checked && $squeue_checked > time - 60)
1137 $squeue_checked = time;
1141 # here is an opportunity to check for mysterious problems with local procs
1145 # get a list of steps still running
1146 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1148 if ($squeue[-1] ne "ok")
1154 # which of my jobsteps are running, according to squeue?
1158 if (/^(\d+)\.(\d+) (\S+)/)
1160 if ($1 eq $ENV{SLURM_JOBID})
1167 # which of my active child procs (>60s old) were not mentioned by squeue?
1168 foreach (keys %proc)
1170 if ($proc{$_}->{time} < time - 60
1171 && !exists $ok{$proc{$_}->{jobstepname}}
1172 && !exists $proc{$_}->{killtime})
1174 # kill this proc if it hasn't exited in 30 seconds
1175 $proc{$_}->{killtime} = time + 30;
1181 sub release_allocation
1185 Log (undef, "release job allocation");
1186 system "scancel $ENV{SLURM_JOBID}";
1194 foreach my $job (keys %reader)
1197 while (0 < sysread ($reader{$job}, $buf, 8192))
1199 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1200 $jobstep[$job]->{stderr} .= $buf;
1201 preprocess_stderr ($job);
1202 if (length ($jobstep[$job]->{stderr}) > 16384)
1204 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1213 sub preprocess_stderr
1217 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1219 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1220 Log ($job, "stderr $line");
1221 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1223 $main::please_freeze = 1;
1225 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1226 $jobstep[$job]->{node_fail} = 1;
1227 ban_node_by_slot($jobstep[$job]->{slotindex});
1236 my $task_success = shift;
1237 preprocess_stderr ($job);
1240 Log ($job, "stderr $_");
1241 } split ("\n", $jobstep[$job]->{stderr});
1247 my ($keep, $child_out, $output_block);
1249 my $cmd = "arv-get \Q$hash\E";
1250 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1254 my $bytes = sysread($keep, $buf, 1024 * 1024);
1255 if (!defined $bytes) {
1256 die "reading from arv-get: $!";
1257 } elsif ($bytes == 0) {
1258 # sysread returns 0 at the end of the pipe.
1261 # some bytes were read into buf.
1262 $output_block .= $buf;
1266 return $output_block;
1271 Log (undef, "collate");
1273 my ($child_out, $child_in);
1274 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1275 '--retries', retry_count());
1279 next if (!exists $_->{'arvados_task'}->{'output'} ||
1280 !$_->{'arvados_task'}->{'success'});
1281 my $output = $_->{'arvados_task'}->{output};
1282 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1284 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1285 print $child_in $output;
1287 elsif (@jobstep == 1)
1289 $joboutput = $output;
1292 elsif (defined (my $outblock = fetch_block ($output)))
1294 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1295 print $child_in $outblock;
1299 Log (undef, "XXX fetch_block($output) failed XXX");
1305 if (!defined $joboutput) {
1306 my $s = IO::Select->new($child_out);
1307 if ($s->can_read(120)) {
1308 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1310 # TODO: Ensure exit status == 0.
1312 Log (undef, "timed out reading from 'arv-put'");
1315 # TODO: kill $pid instead of waiting, now that we've decided to
1316 # ignore further output.
1327 my $sig = 2; # SIGINT first
1328 if (exists $proc{$_}->{"sent_$sig"} &&
1329 time - $proc{$_}->{"sent_$sig"} > 4)
1331 $sig = 15; # SIGTERM if SIGINT doesn't work
1333 if (exists $proc{$_}->{"sent_$sig"} &&
1334 time - $proc{$_}->{"sent_$sig"} > 4)
1336 $sig = 9; # SIGKILL if SIGTERM doesn't work
1338 if (!exists $proc{$_}->{"sent_$sig"})
1340 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1342 select (undef, undef, undef, 0.1);
1345 kill $sig, $_; # srun wants two SIGINT to really interrupt
1347 $proc{$_}->{"sent_$sig"} = time;
1348 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1358 vec($bits,fileno($_),1) = 1;
1364 # Send log output to Keep via arv-put.
1366 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1367 # $log_pipe_pid is the pid of the arv-put subprocess.
1369 # The only functions that should access these variables directly are:
1371 # log_writer_start($logfilename)
1372 # Starts an arv-put pipe, reading data on stdin and writing it to
1373 # a $logfilename file in an output collection.
1375 # log_writer_send($txt)
1376 # Writes $txt to the output log collection.
1378 # log_writer_finish()
1379 # Closes the arv-put pipe and returns the output that it produces.
1381 # log_writer_is_active()
1382 # Returns a true value if there is currently a live arv-put
1383 # process, false otherwise.
1385 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1387 sub log_writer_start($)
1389 my $logfilename = shift;
1390 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1391 'arv-put', '--portable-data-hash',
1393 '--filename', $logfilename,
1397 sub log_writer_send($)
1400 print $log_pipe_in $txt;
1403 sub log_writer_finish()
1405 return unless $log_pipe_pid;
1407 close($log_pipe_in);
1410 my $s = IO::Select->new($log_pipe_out);
1411 if ($s->can_read(120)) {
1412 sysread($log_pipe_out, $arv_put_output, 1024);
1413 chomp($arv_put_output);
1415 Log (undef, "timed out reading from 'arv-put'");
1418 waitpid($log_pipe_pid, 0);
1419 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1421 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1424 return $arv_put_output;
1427 sub log_writer_is_active() {
1428 return $log_pipe_pid;
1431 sub Log # ($jobstep_id, $logmessage)
1433 if ($_[1] =~ /\n/) {
1434 for my $line (split (/\n/, $_[1])) {
1439 my $fh = select STDERR; $|=1; select $fh;
1440 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1441 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1444 if (log_writer_is_active() || -t STDERR) {
1445 my @gmtime = gmtime;
1446 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1447 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1449 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1451 if (log_writer_is_active()) {
1452 log_writer_send($datetime . " " . $message);
1459 my ($package, $file, $line) = caller;
1460 my $message = "@_ at $file line $line\n";
1461 Log (undef, $message);
1462 freeze() if @jobstep_todo;
1463 collate_output() if @jobstep_todo;
1473 if ($Job->{'state'} eq 'Cancelled') {
1474 $Job->update_attributes('finished_at' => scalar gmtime);
1476 $Job->update_attributes('state' => 'Failed');
1483 my $justcheckpoint = shift; # false if this will be the last meta saved
1484 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1485 return unless log_writer_is_active();
1487 my $loglocator = log_writer_finish();
1488 Log (undef, "log manifest is $loglocator");
1489 $Job->{'log'} = $loglocator;
1490 $Job->update_attributes('log', $loglocator);
1494 sub freeze_if_want_freeze
1496 if ($main::please_freeze)
1498 release_allocation();
1501 # kill some srun procs before freeze+stop
1502 map { $proc{$_} = {} } @_;
1505 killem (keys %proc);
1506 select (undef, undef, undef, 0.1);
1508 while (($died = waitpid (-1, WNOHANG)) > 0)
1510 delete $proc{$died};
1525 Log (undef, "Freeze not implemented");
1532 croak ("Thaw not implemented");
1548 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1555 my $srunargs = shift;
1556 my $execargs = shift;
1557 my $opts = shift || {};
1559 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1561 $Data::Dumper::Terse = 1;
1562 $Data::Dumper::Indent = 0;
1563 my $show_cmd = Dumper($args);
1564 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1565 $show_cmd =~ s/\n/ /g;
1566 warn "starting: $show_cmd\n";
1568 if (defined $stdin) {
1569 my $child = open STDIN, "-|";
1570 defined $child or die "no fork: $!";
1572 print $stdin or die $!;
1573 close STDOUT or die $!;
1578 return system (@$args) if $opts->{fork};
1581 warn "ENV size is ".length(join(" ",%ENV));
1582 die "exec failed: $!: @$args";
1586 sub ban_node_by_slot {
1587 # Don't start any new jobsteps on this node for 60 seconds
1589 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1590 $slot[$slotid]->{node}->{hold_count}++;
1591 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1596 my ($lockfile, $error_message) = @_;
1597 open L, ">", $lockfile or croak("$lockfile: $!");
1598 if (!flock L, LOCK_EX|LOCK_NB) {
1599 croak("Can't lock $lockfile: $error_message\n");
1603 sub find_docker_image {
1604 # Given a Keep locator, check to see if it contains a Docker image.
1605 # If so, return its stream name and Docker hash.
1606 # If not, return undef for both values.
1607 my $locator = shift;
1608 my ($streamname, $filename);
1609 my $image = api_call("collections/get", uuid => $locator);
1611 foreach my $line (split(/\n/, $image->{manifest_text})) {
1612 my @tokens = split(/\s+/, $line);
1614 $streamname = shift(@tokens);
1615 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1616 if (defined($filename)) {
1617 return (undef, undef); # More than one file in the Collection.
1619 $filename = (split(/:/, $filedata, 3))[2];
1624 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1625 return ($streamname, $1);
1627 return (undef, undef);
1632 # Calculate the number of times an operation should be retried,
1633 # assuming exponential backoff, and that we're willing to retry as
1634 # long as tasks have been running. Enforce a minimum of 3 retries.
1635 my ($starttime, $endtime, $timediff, $retries);
1637 $starttime = $jobstep[0]->{starttime};
1638 $endtime = $jobstep[-1]->{finishtime};
1640 if (!defined($starttime)) {
1642 } elsif (!defined($endtime)) {
1643 $timediff = time - $starttime;
1645 $timediff = ($endtime - $starttime) - (time - $endtime);
1647 if ($timediff > 0) {
1648 $retries = int(log($timediff) / log(2));
1650 $retries = 1; # Use the minimum.
1652 return ($retries > 3) ? $retries : 3;
1656 # Pass in two function references.
1657 # This method will be called with the remaining arguments.
1658 # If it dies, retry it with exponential backoff until it succeeds,
1659 # or until the current retry_count is exhausted. After each failure
1660 # that can be retried, the second function will be called with
1661 # the current try count (0-based), next try time, and error message.
1662 my $operation = shift;
1663 my $retry_callback = shift;
1664 my $retries = retry_count();
1665 foreach my $try_count (0..$retries) {
1666 my $next_try = time + (2 ** $try_count);
1667 my $result = eval { $operation->(@_); };
1670 } elsif ($try_count < $retries) {
1671 $retry_callback->($try_count, $next_try, $@);
1672 my $sleep_time = $next_try - time;
1673 sleep($sleep_time) if ($sleep_time > 0);
1676 # Ensure the error message ends in a newline, so Perl doesn't add
1677 # retry_op's line number to it.
1683 # Pass in a /-separated API method name, and arguments for it.
1684 # This function will call that method, retrying as needed until
1685 # the current retry_count is exhausted, with a log on the first failure.
1686 my $method_name = shift;
1687 my $log_api_retry = sub {
1688 my ($try_count, $next_try_at, $errmsg) = @_;
1689 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1690 $errmsg =~ s/\s/ /g;
1691 $errmsg =~ s/\s+$//;
1693 if ($next_try_at < time) {
1694 $retry_msg = "Retrying.";
1696 my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
1697 $retry_msg = "Retrying at $next_try_fmt.";
1699 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1702 foreach my $key (split(/\//, $method_name)) {
1703 $method = $method->{$key};
1705 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1709 # Given a $?, return a human-readable exit code string like "0" or
1710 # "1" or "0 with signal 1" or "1 with signal 11".
1711 my $exitcode = shift;
1712 my $s = $exitcode >> 8;
1713 if ($exitcode & 0x7f) {
1714 $s .= " with signal " . ($exitcode & 0x7f);
1716 if ($exitcode & 0x80) {
1717 $s .= " with core dump";
1725 # checkout-and-build
1728 use File::Path qw( make_path remove_tree );
1730 my $destdir = $ENV{"CRUNCH_SRC"};
1731 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1732 my $repo = $ENV{"CRUNCH_SRC_URL"};
1733 my $task_work = $ENV{"TASK_WORK"};
1735 for my $dir ($destdir, $task_work) {
1738 -e $dir or die "Failed to create temporary directory ($dir): $!";
1743 remove_tree($task_work, {keep_root => 1});
1747 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1749 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1752 die "Cannot exec `@ARGV`: $!";
1758 unlink "$destdir.commit";
1759 open STDERR_ORIG, ">&STDERR";
1760 open STDOUT, ">", "$destdir.log";
1761 open STDERR, ">&STDOUT";
1764 my @git_archive_data = <DATA>;
1765 if (@git_archive_data) {
1766 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1767 print TARX @git_archive_data;
1769 die "'tar -C $destdir -xf -' exited $?: $!";
1774 chomp ($pwd = `pwd`);
1775 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1778 for my $src_path ("$destdir/arvados/sdk/python") {
1780 shell_or_die ("virtualenv", $install_dir);
1781 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1785 if (-e "$destdir/crunch_scripts/install") {
1786 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1787 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1789 shell_or_die ("./tests/autotests.sh", $install_dir);
1790 } elsif (-e "./install.sh") {
1791 shell_or_die ("./install.sh", $install_dir);
1795 unlink "$destdir.commit.new";
1796 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1797 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1804 die "Cannot exec `@ARGV`: $!";
1811 if ($ENV{"DEBUG"}) {
1812 print STDERR "@_\n";
1814 if (system (@_) != 0) {
1816 my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1817 open STDERR, ">&STDERR_ORIG";
1818 system ("cat $destdir.log >&2");
1819 die "@_ failed ($err): $exitstatus";