2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
90 use Digest::MD5 qw(md5_hex);
96 use File::Path qw( make_path remove_tree );
98 use constant EX_TEMPFAIL => 75;
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103 if ($ENV{"USER"} ne "crunch" && $< != 0) {
104 # use a tmp dir unique for my uid
105 $ENV{"CRUNCH_TMP"} .= "-$<";
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
125 GetOptions('force-unlock' => \$force_unlock,
126 'git-dir=s' => \$git_dir,
127 'job=s' => \$jobspec,
128 'job-api-token=s' => \$job_api_token,
129 'no-clear-tmp' => \$no_clear_tmp,
130 'resume-stash=s' => \$resume_stash,
133 if (defined $job_api_token) {
134 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
143 $main::ENV{CRUNCH_DEBUG} = 1;
147 $main::ENV{CRUNCH_DEBUG} = 0;
152 my $arv = Arvados->new('apiVersion' => 'v1');
160 my $User = api_call("users/current");
162 if ($jobspec =~ /^[-a-z\d]+$/)
164 # $jobspec is an Arvados UUID, not a JSON job specification
165 $Job = api_call("jobs/get", uuid => $jobspec);
166 if (!$force_unlock) {
167 # Claim this job, and make sure nobody else does
168 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
170 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
187 $Job->{'state'} = 'Running';
189 $Job = api_call("jobs/create", job => $Job);
191 $job_id = $Job->{'uuid'};
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 Log (undef, "check slurm allocation");
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209 push @sinfo, "$localcpus localhost";
211 if (exists $ENV{SLURM_NODELIST})
213 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 my ($ncpus, $slurm_nodelist) = split;
218 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
221 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
224 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
227 foreach (split (",", $ranges))
240 push @nodelist, map {
242 $n =~ s/\[[-,\d]+\]/$_/;
249 push @nodelist, $nodelist;
252 foreach my $nodename (@nodelist)
254 Log (undef, "node $nodename - $ncpus slots");
255 my $node = { name => $nodename,
259 foreach my $cpu (1..$ncpus)
261 push @slot, { node => $node,
265 push @node, @nodelist;
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
276 $Job->update_attributes(
277 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
307 my $squeue_kill_checked;
308 my $latest_refresh = scalar time;
312 if (defined $Job->{thawedfromkey})
314 thaw ($Job->{thawedfromkey});
318 my $first_task = api_call("job_tasks/create", job_task => {
319 'job_uuid' => $Job->{'uuid'},
324 push @jobstep, { 'level' => 0,
326 'arvados_task' => $first_task,
328 push @jobstep_todo, 0;
334 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
341 $build_script = <DATA>;
343 my $nodelist = join(",", @node);
345 if (!defined $no_clear_tmp) {
346 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
347 Log (undef, "Clean work dirs");
349 my $cleanpid = fork();
352 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
353 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
358 last if $cleanpid == waitpid (-1, WNOHANG);
359 freeze_if_want_freeze ($cleanpid);
360 select (undef, undef, undef, 0.1);
362 Log (undef, "Cleanup command exited ".exit_status_s($?));
367 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
368 # If script_version looks like an absolute path, *and* the --git-dir
369 # argument was not given -- which implies we were not invoked by
370 # crunch-dispatch -- we will use the given path as a working
371 # directory instead of resolving script_version to a git commit (or
372 # doing anything else with git).
373 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
374 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
377 # Resolve the given script_version to a git commit sha1. Also, if
378 # the repository is remote, clone it into our local filesystem: this
379 # ensures "git archive" will work, and is necessary to reliably
380 # resolve a symbolic script_version like "master^".
381 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
385 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
387 # If we're running under crunch-dispatch, it will have already
388 # pulled the appropriate source tree into its own repository, and
389 # given us that repo's path as $git_dir.
391 # If we're running a "local" job, we might have to fetch content
392 # from a remote repository.
394 # (Currently crunch-dispatch gives a local path with --git-dir, but
395 # we might as well accept URLs there too in case it changes its
397 my $repo = $git_dir || $Job->{'repository'};
399 # Repository can be remote or local. If remote, we'll need to fetch it
400 # to a local dir before doing `git log` et al.
403 if ($repo =~ m{://|^[^/]*:}) {
404 # $repo is a git url we can clone, like git:// or https:// or
405 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
406 # not recognized here because distinguishing that from a local
407 # path is too fragile. If you really need something strange here,
408 # use the ssh:// form.
409 $repo_location = 'remote';
410 } elsif ($repo =~ m{^\.*/}) {
411 # $repo is a local path to a git index. We'll also resolve ../foo
412 # to ../foo/.git if the latter is a directory. To help
413 # disambiguate local paths from named hosted repositories, this
414 # form must be given as ./ or ../ if it's a relative path.
415 if (-d "$repo/.git") {
416 $repo = "$repo/.git";
418 $repo_location = 'local';
420 # $repo is none of the above. It must be the name of a hosted
422 my $arv_repo_list = api_call("repositories/list",
423 'filters' => [['name','=',$repo]]);
424 my @repos_found = @{$arv_repo_list->{'items'}};
425 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
427 Log(undef, "Repository '$repo' -> "
428 . join(", ", map { $_->{'uuid'} } @repos_found));
431 croak("Error: Found $n_found repositories with name '$repo'.");
433 $repo = $repos_found[0]->{'fetch_url'};
434 $repo_location = 'remote';
436 Log(undef, "Using $repo_location repository '$repo'");
437 $ENV{"CRUNCH_SRC_URL"} = $repo;
439 # Resolve given script_version (we'll call that $treeish here) to a
440 # commit sha1 ($commit).
441 my $treeish = $Job->{'script_version'};
443 if ($repo_location eq 'remote') {
444 # We minimize excess object-fetching by re-using the same bare
445 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
446 # just keep adding remotes to it as needed.
447 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
448 my $gitcmd = "git --git-dir=\Q$local_repo\E";
450 # Set up our local repo for caching remote objects, making
452 if (!-d $local_repo) {
453 make_path($local_repo) or croak("Error: could not create $local_repo");
455 # This works (exits 0 and doesn't delete fetched objects) even
456 # if $local_repo is already initialized:
457 `$gitcmd init --bare`;
459 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
462 # If $treeish looks like a hash (or abbrev hash) we look it up in
463 # our local cache first, since that's cheaper. (We don't want to
464 # do that with tags/branches though -- those change over time, so
465 # they should always be resolved by the remote repo.)
466 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
467 # Hide stderr because it's normal for this to fail:
468 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
470 # Careful not to resolve a branch named abcdeff to commit 1234567:
471 $sha1 =~ /^$treeish/ &&
472 $sha1 =~ /^([0-9a-f]{40})$/s) {
474 Log(undef, "Commit $commit already present in $local_repo");
478 if (!defined $commit) {
479 # If $treeish isn't just a hash or abbrev hash, or isn't here
480 # yet, we need to fetch the remote to resolve it correctly.
482 # First, remove all local heads. This prevents a name that does
483 # not exist on the remote from resolving to (or colliding with)
484 # a previously fetched branch or tag (possibly from a different
486 remove_tree("$local_repo/refs/heads", {keep_root => 1});
488 Log(undef, "Fetching objects from $repo to $local_repo");
489 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
491 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
495 # Now that the data is all here, we will use our local repo for
496 # the rest of our git activities.
500 my $gitcmd = "git --git-dir=\Q$repo\E";
501 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
502 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
503 croak("`$gitcmd rev-list` exited "
505 .", '$treeish' not found. Giving up.");
508 Log(undef, "Version $treeish is commit $commit");
510 if ($commit ne $Job->{'script_version'}) {
511 # Record the real commit id in the database, frozentokey, logs,
512 # etc. -- instead of an abbreviation or a branch name which can
513 # become ambiguous or point to a different commit in the future.
514 if (!$Job->update_attributes('script_version' => $commit)) {
515 croak("Error: failed to update job's script_version attribute");
519 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
520 $git_archive = `$gitcmd archive ''\Q$commit\E`;
522 croak("Error: $gitcmd archive exited ".exit_status_s($?));
526 if (!defined $git_archive) {
527 Log(undef, "Skip install phase (no git archive)");
529 Log(undef, "Warning: This probably means workers have no source tree!");
533 Log(undef, "Run install script on all workers");
535 my @srunargs = ("srun",
536 "--nodelist=$nodelist",
537 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
538 my @execargs = ("sh", "-c",
539 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
541 my $installpid = fork();
542 if ($installpid == 0)
544 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
549 last if $installpid == waitpid (-1, WNOHANG);
550 freeze_if_want_freeze ($installpid);
551 select (undef, undef, undef, 0.1);
553 my $install_exited = $?;
554 Log (undef, "Install script exited ".exit_status_s($install_exited));
555 exit (1) if $install_exited != 0;
560 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
561 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
564 # If this job requires a Docker image, install that.
565 my $docker_bin = "/usr/bin/docker.io";
566 my ($docker_locator, $docker_stream, $docker_hash);
567 if ($docker_locator = $Job->{docker_image_locator}) {
568 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
571 croak("No Docker image hash found from locator $docker_locator");
573 $docker_stream =~ s/^\.//;
574 my $docker_install_script = qq{
575 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
576 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
579 my $docker_pid = fork();
580 if ($docker_pid == 0)
582 srun (["srun", "--nodelist=" . join(',', @node)],
583 ["/bin/sh", "-ec", $docker_install_script]);
588 last if $docker_pid == waitpid (-1, WNOHANG);
589 freeze_if_want_freeze ($docker_pid);
590 select (undef, undef, undef, 0.1);
594 croak("Installing Docker image from $docker_locator exited "
599 foreach (qw (script script_version script_parameters runtime_constraints))
603 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
605 foreach (split (/\n/, $Job->{knobs}))
607 Log (undef, "knob " . $_);
612 $main::success = undef;
618 my $thisround_succeeded = 0;
619 my $thisround_failed = 0;
620 my $thisround_failed_multiple = 0;
622 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
623 or $a <=> $b } @jobstep_todo;
624 my $level = $jobstep[$jobstep_todo[0]]->{level};
625 Log (undef, "start level $level");
630 my @freeslot = (0..$#slot);
633 my $progress_is_dirty = 1;
634 my $progress_stats_updated = 0;
636 update_progress_stats();
641 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
643 my $id = $jobstep_todo[$todo_ptr];
644 my $Jobstep = $jobstep[$id];
645 if ($Jobstep->{level} != $level)
650 pipe $reader{$id}, "writer" or croak ($!);
651 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
652 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
654 my $childslot = $freeslot[0];
655 my $childnode = $slot[$childslot]->{node};
656 my $childslotname = join (".",
657 $slot[$childslot]->{node}->{name},
658 $slot[$childslot]->{cpu});
659 my $childpid = fork();
662 $SIG{'INT'} = 'DEFAULT';
663 $SIG{'QUIT'} = 'DEFAULT';
664 $SIG{'TERM'} = 'DEFAULT';
666 foreach (values (%reader))
670 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
671 open(STDOUT,">&writer");
672 open(STDERR,">&writer");
677 delete $ENV{"GNUPGHOME"};
678 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
679 $ENV{"TASK_QSEQUENCE"} = $id;
680 $ENV{"TASK_SEQUENCE"} = $level;
681 $ENV{"JOB_SCRIPT"} = $Job->{script};
682 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
683 $param =~ tr/a-z/A-Z/;
684 $ENV{"JOB_PARAMETER_$param"} = $value;
686 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
687 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
688 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
689 $ENV{"HOME"} = $ENV{"TASK_WORK"};
690 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
691 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
692 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
693 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
699 "--nodelist=".$childnode->{name},
700 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
701 "--job-name=$job_id.$id.$$",
704 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706 ."&& cd $ENV{CRUNCH_TMP} ";
707 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
710 my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
711 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
712 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
714 # Dynamically configure the container to use the host system as its
715 # DNS server. Get the host's global addresses from the ip command,
716 # and turn them into docker --dns options using gawk.
718 q{$(ip -o address show scope global |
719 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
721 # The source tree and $destdir directory (which we have
722 # installed on the worker host) are available in the container,
723 # under the same path.
724 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
725 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
727 # Currently, we make arv-mount's mount point appear at /keep
728 # inside the container (instead of using the same path as the
729 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
730 # crunch scripts and utilities must not rely on this. They must
731 # use $TASK_KEEPMOUNT.
732 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
733 $ENV{TASK_KEEPMOUNT} = "/keep";
735 # TASK_WORK is almost exactly like a docker data volume: it
736 # starts out empty, is writable, and persists until no
737 # containers use it any more. We don't use --volumes-from to
738 # share it with other containers: it is only accessible to this
739 # task, and it goes away when this task stops.
741 # However, a docker data volume is writable only by root unless
742 # the mount point already happens to exist in the container with
743 # different permissions. Therefore, we [1] assume /tmp already
744 # exists in the image and is writable by the crunch user; [2]
745 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
746 # writable if they are created by docker while setting up the
747 # other --volumes); and [3] create $TASK_WORK inside the
748 # container using $build_script.
749 $command .= "--volume=/tmp ";
750 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
751 $ENV{"HOME"} = $ENV{"TASK_WORK"};
752 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
754 # TODO: Share a single JOB_WORK volume across all task
755 # containers on a given worker node, and delete it when the job
756 # ends (and, in case that doesn't work, when the next job
759 # For now, use the same approach as TASK_WORK above.
760 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
762 while (my ($env_key, $env_val) = each %ENV)
764 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
765 $command .= "--env=\Q$env_key=$env_val\E ";
768 $command .= "--env=\QHOME=$ENV{HOME}\E ";
769 $command .= "\Q$docker_hash\E ";
770 $command .= "stdbuf --output=0 --error=0 ";
771 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
774 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
775 $command .= "stdbuf --output=0 --error=0 ";
776 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
779 my @execargs = ('bash', '-c', $command);
780 srun (\@srunargs, \@execargs, undef, $build_script);
781 # exec() failed, we assume nothing happened.
782 die "srun() failed on build script\n";
785 if (!defined $childpid)
792 $proc{$childpid} = { jobstep => $id,
795 jobstepname => "$job_id.$id.$childpid",
797 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
798 $slot[$childslot]->{pid} = $childpid;
800 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
801 Log ($id, "child $childpid started on $childslotname");
802 $Jobstep->{starttime} = time;
803 $Jobstep->{node} = $childnode->{name};
804 $Jobstep->{slotindex} = $childslot;
805 delete $Jobstep->{stderr};
806 delete $Jobstep->{finishtime};
808 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
809 $Jobstep->{'arvados_task'}->save;
811 splice @jobstep_todo, $todo_ptr, 1;
814 $progress_is_dirty = 1;
818 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
820 last THISROUND if $main::please_freeze;
821 if ($main::please_info)
823 $main::please_info = 0;
825 create_output_collection();
827 update_progress_stats();
834 check_refresh_wanted();
836 update_progress_stats();
837 select (undef, undef, undef, 0.1);
839 elsif (time - $progress_stats_updated >= 30)
841 update_progress_stats();
843 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
844 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
846 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
847 .($thisround_failed+$thisround_succeeded)
848 .") -- giving up on this round";
849 Log (undef, $message);
853 # move slots from freeslot to holdslot (or back to freeslot) if necessary
854 for (my $i=$#freeslot; $i>=0; $i--) {
855 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
856 push @holdslot, (splice @freeslot, $i, 1);
859 for (my $i=$#holdslot; $i>=0; $i--) {
860 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
861 push @freeslot, (splice @holdslot, $i, 1);
865 # give up if no nodes are succeeding
866 if (!grep { $_->{node}->{losing_streak} == 0 &&
867 $_->{node}->{hold_count} < 4 } @slot) {
868 my $message = "Every node has failed -- giving up on this round";
869 Log (undef, $message);
876 push @freeslot, splice @holdslot;
877 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
880 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
883 if ($main::please_continue) {
884 $main::please_continue = 0;
887 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
891 check_refresh_wanted();
893 update_progress_stats();
894 select (undef, undef, undef, 0.1);
895 killem (keys %proc) if $main::please_freeze;
899 update_progress_stats();
900 freeze_if_want_freeze();
903 if (!defined $main::success)
906 $thisround_succeeded == 0 &&
907 ($thisround_failed == 0 || $thisround_failed > 4))
909 my $message = "stop because $thisround_failed tasks failed and none succeeded";
910 Log (undef, $message);
919 goto ONELEVEL if !defined $main::success;
922 release_allocation();
924 my $collated_output = &create_output_collection();
926 if (!$collated_output) {
927 Log (undef, "Failed to write output collection");
930 Log(undef, "output hash " . $collated_output);
931 $Job->update_attributes('output' => $collated_output);
934 Log (undef, "finish");
939 if ($collated_output && $main::success) {
940 $final_state = 'Complete';
942 $final_state = 'Failed';
944 $Job->update_attributes('state' => $final_state);
946 exit (($final_state eq 'Complete') ? 0 : 1);
950 sub update_progress_stats
952 $progress_stats_updated = time;
953 return if !$progress_is_dirty;
954 my ($todo, $done, $running) = (scalar @jobstep_todo,
955 scalar @jobstep_done,
956 scalar @slot - scalar @freeslot - scalar @holdslot);
957 $Job->{'tasks_summary'} ||= {};
958 $Job->{'tasks_summary'}->{'todo'} = $todo;
959 $Job->{'tasks_summary'}->{'done'} = $done;
960 $Job->{'tasks_summary'}->{'running'} = $running;
961 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
962 Log (undef, "status: $done done, $running running, $todo todo");
963 $progress_is_dirty = 0;
970 my $pid = waitpid (-1, WNOHANG);
971 return 0 if $pid <= 0;
973 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
975 . $slot[$proc{$pid}->{slot}]->{cpu});
976 my $jobstepid = $proc{$pid}->{jobstep};
977 my $elapsed = time - $proc{$pid}->{time};
978 my $Jobstep = $jobstep[$jobstepid];
980 my $childstatus = $?;
981 my $exitvalue = $childstatus >> 8;
982 my $exitinfo = "exit ".exit_status_s($childstatus);
983 $Jobstep->{'arvados_task'}->reload;
984 my $task_success = $Jobstep->{'arvados_task'}->{success};
986 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
988 if (!defined $task_success) {
989 # task did not indicate one way or the other --> fail
990 $Jobstep->{'arvados_task'}->{success} = 0;
991 $Jobstep->{'arvados_task'}->save;
998 $temporary_fail ||= $Jobstep->{node_fail};
999 $temporary_fail ||= ($exitvalue == 111);
1001 ++$thisround_failed;
1002 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1004 # Check for signs of a failed or misconfigured node
1005 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1006 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1007 # Don't count this against jobstep failure thresholds if this
1008 # node is already suspected faulty and srun exited quickly
1009 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1011 Log ($jobstepid, "blaming failure on suspect node " .
1012 $slot[$proc{$pid}->{slot}]->{node}->{name});
1013 $temporary_fail ||= 1;
1015 ban_node_by_slot($proc{$pid}->{slot});
1018 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1019 ++$Jobstep->{'failures'},
1020 $temporary_fail ? 'temporary ' : 'permanent',
1023 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1024 # Give up on this task, and the whole job
1026 $main::please_freeze = 1;
1028 # Put this task back on the todo queue
1029 push @jobstep_todo, $jobstepid;
1030 $Job->{'tasks_summary'}->{'failed'}++;
1034 ++$thisround_succeeded;
1035 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1036 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1037 push @jobstep_done, $jobstepid;
1038 Log ($jobstepid, "success in $elapsed seconds");
1040 $Jobstep->{exitcode} = $childstatus;
1041 $Jobstep->{finishtime} = time;
1042 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1043 $Jobstep->{'arvados_task'}->save;
1044 process_stderr ($jobstepid, $task_success);
1045 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1047 close $reader{$jobstepid};
1048 delete $reader{$jobstepid};
1049 delete $slot[$proc{$pid}->{slot}]->{pid};
1050 push @freeslot, $proc{$pid}->{slot};
1053 if ($task_success) {
1055 my $newtask_list = [];
1056 my $newtask_results;
1058 $newtask_results = api_call(
1061 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1063 'order' => 'qsequence',
1064 'offset' => scalar(@$newtask_list),
1066 push(@$newtask_list, @{$newtask_results->{items}});
1067 } while (@{$newtask_results->{items}});
1068 foreach my $arvados_task (@$newtask_list) {
1070 'level' => $arvados_task->{'sequence'},
1072 'arvados_task' => $arvados_task
1074 push @jobstep, $jobstep;
1075 push @jobstep_todo, $#jobstep;
1079 $progress_is_dirty = 1;
1083 sub check_refresh_wanted
1085 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1086 if (@stat && $stat[9] > $latest_refresh) {
1087 $latest_refresh = scalar time;
1088 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1089 for my $attr ('cancelled_at',
1090 'cancelled_by_user_uuid',
1091 'cancelled_by_client_uuid',
1093 $Job->{$attr} = $Job2->{$attr};
1095 if ($Job->{'state'} ne "Running") {
1096 if ($Job->{'state'} eq "Cancelled") {
1097 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1099 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1102 $main::please_freeze = 1;
1109 # return if the kill list was checked <4 seconds ago
1110 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1114 $squeue_kill_checked = time;
1116 # use killem() on procs whose killtime is reached
1119 if (exists $proc{$_}->{killtime}
1120 && $proc{$_}->{killtime} <= time)
1126 # return if the squeue was checked <60 seconds ago
1127 if (defined $squeue_checked && $squeue_checked > time - 60)
1131 $squeue_checked = time;
1135 # here is an opportunity to check for mysterious problems with local procs
1139 # get a list of steps still running
1140 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1142 if ($squeue[-1] ne "ok")
1148 # which of my jobsteps are running, according to squeue?
1152 if (/^(\d+)\.(\d+) (\S+)/)
1154 if ($1 eq $ENV{SLURM_JOBID})
1161 # which of my active child procs (>60s old) were not mentioned by squeue?
1162 foreach (keys %proc)
1164 if ($proc{$_}->{time} < time - 60
1165 && !exists $ok{$proc{$_}->{jobstepname}}
1166 && !exists $proc{$_}->{killtime})
1168 # kill this proc if it hasn't exited in 30 seconds
1169 $proc{$_}->{killtime} = time + 30;
1175 sub release_allocation
1179 Log (undef, "release job allocation");
1180 system "scancel $ENV{SLURM_JOBID}";
1188 foreach my $job (keys %reader)
1191 while (0 < sysread ($reader{$job}, $buf, 8192))
1193 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1194 $jobstep[$job]->{stderr} .= $buf;
1195 preprocess_stderr ($job);
1196 if (length ($jobstep[$job]->{stderr}) > 16384)
1198 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1207 sub preprocess_stderr
1211 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1213 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1214 Log ($job, "stderr $line");
1215 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1217 $main::please_freeze = 1;
1219 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1220 $jobstep[$job]->{node_fail} = 1;
1221 ban_node_by_slot($jobstep[$job]->{slotindex});
1230 my $task_success = shift;
1231 preprocess_stderr ($job);
1234 Log ($job, "stderr $_");
1235 } split ("\n", $jobstep[$job]->{stderr});
1241 my ($keep, $child_out, $output_block);
1243 my $cmd = "arv-get \Q$hash\E";
1244 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1248 my $bytes = sysread($keep, $buf, 1024 * 1024);
1249 if (!defined $bytes) {
1250 die "reading from arv-get: $!";
1251 } elsif ($bytes == 0) {
1252 # sysread returns 0 at the end of the pipe.
1255 # some bytes were read into buf.
1256 $output_block .= $buf;
1260 return $output_block;
1263 # create_output_collections generates a new collection containing the
1264 # output of each successfully completed task, and returns the
1265 # portable_data_hash for the new collection.
1267 sub create_output_collection
1269 Log (undef, "collate");
1271 my ($child_out, $child_in);
1272 my $pid = open2($child_out, $child_in, 'python', '-c',
1273 'import arvados; ' .
1275 'print arvados.api()' .
1277 '.create(body={"manifest_text":sys.stdin.read()})' .
1278 '.execute()["portable_data_hash"]'
1283 next if (!exists $_->{'arvados_task'}->{'output'} ||
1284 !$_->{'arvados_task'}->{'success'});
1285 my $output = $_->{'arvados_task'}->{output};
1286 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1288 print $child_in $output;
1290 elsif (defined (my $outblock = fetch_block ($output)))
1292 print $child_in $outblock;
1296 Log (undef, "XXX fetch_block($output) failed XXX");
1303 my $s = IO::Select->new($child_out);
1304 if ($s->can_read(120)) {
1305 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1307 # TODO: Ensure exit status == 0.
1309 Log (undef, "timed out while creating output collection");
1311 # TODO: kill $pid instead of waiting, now that we've decided to
1312 # ignore further output.
1323 my $sig = 2; # SIGINT first
1324 if (exists $proc{$_}->{"sent_$sig"} &&
1325 time - $proc{$_}->{"sent_$sig"} > 4)
1327 $sig = 15; # SIGTERM if SIGINT doesn't work
1329 if (exists $proc{$_}->{"sent_$sig"} &&
1330 time - $proc{$_}->{"sent_$sig"} > 4)
1332 $sig = 9; # SIGKILL if SIGTERM doesn't work
1334 if (!exists $proc{$_}->{"sent_$sig"})
1336 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1338 select (undef, undef, undef, 0.1);
1341 kill $sig, $_; # srun wants two SIGINT to really interrupt
1343 $proc{$_}->{"sent_$sig"} = time;
1344 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1354 vec($bits,fileno($_),1) = 1;
1360 # Send log output to Keep via arv-put.
1362 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1363 # $log_pipe_pid is the pid of the arv-put subprocess.
1365 # The only functions that should access these variables directly are:
1367 # log_writer_start($logfilename)
1368 # Starts an arv-put pipe, reading data on stdin and writing it to
1369 # a $logfilename file in an output collection.
1371 # log_writer_send($txt)
1372 # Writes $txt to the output log collection.
1374 # log_writer_finish()
1375 # Closes the arv-put pipe and returns the output that it produces.
1377 # log_writer_is_active()
1378 # Returns a true value if there is currently a live arv-put
1379 # process, false otherwise.
1381 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1383 sub log_writer_start($)
1385 my $logfilename = shift;
1386 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1387 'arv-put', '--portable-data-hash',
1389 '--filename', $logfilename,
1393 sub log_writer_send($)
1396 print $log_pipe_in $txt;
1399 sub log_writer_finish()
1401 return unless $log_pipe_pid;
1403 close($log_pipe_in);
1406 my $s = IO::Select->new($log_pipe_out);
1407 if ($s->can_read(120)) {
1408 sysread($log_pipe_out, $arv_put_output, 1024);
1409 chomp($arv_put_output);
1411 Log (undef, "timed out reading from 'arv-put'");
1414 waitpid($log_pipe_pid, 0);
1415 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1417 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1420 return $arv_put_output;
1423 sub log_writer_is_active() {
1424 return $log_pipe_pid;
1427 sub Log # ($jobstep_id, $logmessage)
1429 if ($_[1] =~ /\n/) {
1430 for my $line (split (/\n/, $_[1])) {
1435 my $fh = select STDERR; $|=1; select $fh;
1436 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1437 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1440 if (log_writer_is_active() || -t STDERR) {
1441 my @gmtime = gmtime;
1442 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1443 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1445 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1447 if (log_writer_is_active()) {
1448 log_writer_send($datetime . " " . $message);
1455 my ($package, $file, $line) = caller;
1456 my $message = "@_ at $file line $line\n";
1457 Log (undef, $message);
1458 freeze() if @jobstep_todo;
1459 create_output_collection() if @jobstep_todo;
1469 if ($Job->{'state'} eq 'Cancelled') {
1470 $Job->update_attributes('finished_at' => scalar gmtime);
1472 $Job->update_attributes('state' => 'Failed');
1479 my $justcheckpoint = shift; # false if this will be the last meta saved
1480 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1481 return unless log_writer_is_active();
1483 my $loglocator = log_writer_finish();
1484 Log (undef, "log manifest is $loglocator");
1485 $Job->{'log'} = $loglocator;
1486 $Job->update_attributes('log', $loglocator);
1490 sub freeze_if_want_freeze
1492 if ($main::please_freeze)
1494 release_allocation();
1497 # kill some srun procs before freeze+stop
1498 map { $proc{$_} = {} } @_;
1501 killem (keys %proc);
1502 select (undef, undef, undef, 0.1);
1504 while (($died = waitpid (-1, WNOHANG)) > 0)
1506 delete $proc{$died};
1511 create_output_collection();
1521 Log (undef, "Freeze not implemented");
1528 croak ("Thaw not implemented");
1544 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1551 my $srunargs = shift;
1552 my $execargs = shift;
1553 my $opts = shift || {};
1555 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1557 $Data::Dumper::Terse = 1;
1558 $Data::Dumper::Indent = 0;
1559 my $show_cmd = Dumper($args);
1560 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1561 $show_cmd =~ s/\n/ /g;
1562 warn "starting: $show_cmd\n";
1564 if (defined $stdin) {
1565 my $child = open STDIN, "-|";
1566 defined $child or die "no fork: $!";
1568 print $stdin or die $!;
1569 close STDOUT or die $!;
1574 return system (@$args) if $opts->{fork};
1577 warn "ENV size is ".length(join(" ",%ENV));
1578 die "exec failed: $!: @$args";
1582 sub ban_node_by_slot {
1583 # Don't start any new jobsteps on this node for 60 seconds
1585 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1586 $slot[$slotid]->{node}->{hold_count}++;
1587 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1592 my ($lockfile, $error_message) = @_;
1593 open L, ">", $lockfile or croak("$lockfile: $!");
1594 if (!flock L, LOCK_EX|LOCK_NB) {
1595 croak("Can't lock $lockfile: $error_message\n");
1599 sub find_docker_image {
1600 # Given a Keep locator, check to see if it contains a Docker image.
1601 # If so, return its stream name and Docker hash.
1602 # If not, return undef for both values.
1603 my $locator = shift;
1604 my ($streamname, $filename);
1605 my $image = api_call("collections/get", uuid => $locator);
1607 foreach my $line (split(/\n/, $image->{manifest_text})) {
1608 my @tokens = split(/\s+/, $line);
1610 $streamname = shift(@tokens);
1611 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1612 if (defined($filename)) {
1613 return (undef, undef); # More than one file in the Collection.
1615 $filename = (split(/:/, $filedata, 3))[2];
1620 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1621 return ($streamname, $1);
1623 return (undef, undef);
1628 # Calculate the number of times an operation should be retried,
1629 # assuming exponential backoff, and that we're willing to retry as
1630 # long as tasks have been running. Enforce a minimum of 3 retries.
1631 my ($starttime, $endtime, $timediff, $retries);
1633 $starttime = $jobstep[0]->{starttime};
1634 $endtime = $jobstep[-1]->{finishtime};
1636 if (!defined($starttime)) {
1638 } elsif (!defined($endtime)) {
1639 $timediff = time - $starttime;
1641 $timediff = ($endtime - $starttime) - (time - $endtime);
1643 if ($timediff > 0) {
1644 $retries = int(log($timediff) / log(2));
1646 $retries = 1; # Use the minimum.
1648 return ($retries > 3) ? $retries : 3;
1652 # Pass in two function references.
1653 # This method will be called with the remaining arguments.
1654 # If it dies, retry it with exponential backoff until it succeeds,
1655 # or until the current retry_count is exhausted. After each failure
1656 # that can be retried, the second function will be called with
1657 # the current try count (0-based), next try time, and error message.
1658 my $operation = shift;
1659 my $retry_callback = shift;
1660 my $retries = retry_count();
1661 foreach my $try_count (0..$retries) {
1662 my $next_try = time + (2 ** $try_count);
1663 my $result = eval { $operation->(@_); };
1666 } elsif ($try_count < $retries) {
1667 $retry_callback->($try_count, $next_try, $@);
1668 my $sleep_time = $next_try - time;
1669 sleep($sleep_time) if ($sleep_time > 0);
1672 # Ensure the error message ends in a newline, so Perl doesn't add
1673 # retry_op's line number to it.
1679 # Pass in a /-separated API method name, and arguments for it.
1680 # This function will call that method, retrying as needed until
1681 # the current retry_count is exhausted, with a log on the first failure.
1682 my $method_name = shift;
1683 my $log_api_retry = sub {
1684 my ($try_count, $next_try_at, $errmsg) = @_;
1685 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1686 $errmsg =~ s/\s/ /g;
1687 $errmsg =~ s/\s+$//;
1689 if ($next_try_at < time) {
1690 $retry_msg = "Retrying.";
1692 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1693 $retry_msg = "Retrying at $next_try_fmt.";
1695 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1698 foreach my $key (split(/\//, $method_name)) {
1699 $method = $method->{$key};
1701 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1705 # Given a $?, return a human-readable exit code string like "0" or
1706 # "1" or "0 with signal 1" or "1 with signal 11".
1707 my $exitcode = shift;
1708 my $s = $exitcode >> 8;
1709 if ($exitcode & 0x7f) {
1710 $s .= " with signal " . ($exitcode & 0x7f);
1712 if ($exitcode & 0x80) {
1713 $s .= " with core dump";
1721 # checkout-and-build
1724 use File::Path qw( make_path remove_tree );
1726 my $destdir = $ENV{"CRUNCH_SRC"};
1727 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1728 my $repo = $ENV{"CRUNCH_SRC_URL"};
1729 my $job_work = $ENV{"JOB_WORK"};
1730 my $task_work = $ENV{"TASK_WORK"};
1732 for my $dir ($destdir, $job_work, $task_work) {
1735 -e $dir or die "Failed to create temporary directory ($dir): $!";
1740 remove_tree($task_work, {keep_root => 1});
1743 my @git_archive_data = <DATA>;
1744 if (!@git_archive_data) {
1745 # Nothing to extract -> nothing to install.
1746 run_argv_and_exit();
1749 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1751 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1752 # This version already installed -> nothing to do.
1753 run_argv_and_exit();
1756 unlink "$destdir.commit";
1757 open STDERR_ORIG, ">&STDERR";
1758 open STDOUT, ">", "$destdir.log";
1759 open STDERR, ">&STDOUT";
1762 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1763 print TARX @git_archive_data;
1765 die "'tar -C $destdir -xf -' exited $?: $!";
1769 chomp ($pwd = `pwd`);
1770 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1773 for my $src_path ("$destdir/arvados/sdk/python") {
1775 shell_or_die ("virtualenv", $install_dir);
1776 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1780 if (-e "$destdir/crunch_scripts/install") {
1781 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1782 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1784 shell_or_die ("./tests/autotests.sh", $install_dir);
1785 } elsif (-e "./install.sh") {
1786 shell_or_die ("./install.sh", $install_dir);
1790 unlink "$destdir.commit.new";
1791 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1792 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1797 run_argv_and_exit();
1799 sub run_argv_and_exit
1803 die "Cannot exec `@ARGV`: $!";
1811 if ($ENV{"DEBUG"}) {
1812 print STDERR "@_\n";
1814 if (system (@_) != 0) {
1816 my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1817 open STDERR, ">&STDERR_ORIG";
1818 system ("cat $destdir.log >&2");
1819 die "@_ failed ($err): $exitstatus";