2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
90 use Digest::MD5 qw(md5_hex);
96 use File::Path qw( make_path remove_tree );
98 use constant EX_TEMPFAIL => 75;
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103 if ($ENV{"USER"} ne "crunch" && $< != 0) {
104 # use a tmp dir unique for my uid
105 $ENV{"CRUNCH_TMP"} .= "-$<";
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
125 GetOptions('force-unlock' => \$force_unlock,
126 'git-dir=s' => \$git_dir,
127 'job=s' => \$jobspec,
128 'job-api-token=s' => \$job_api_token,
129 'no-clear-tmp' => \$no_clear_tmp,
130 'resume-stash=s' => \$resume_stash,
133 if (defined $job_api_token) {
134 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
143 $main::ENV{CRUNCH_DEBUG} = 1;
147 $main::ENV{CRUNCH_DEBUG} = 0;
152 my $arv = Arvados->new('apiVersion' => 'v1');
160 my $User = api_call("users/current");
162 if ($jobspec =~ /^[-a-z\d]+$/)
164 # $jobspec is an Arvados UUID, not a JSON job specification
165 $Job = api_call("jobs/get", uuid => $jobspec);
166 if (!$force_unlock) {
167 # Claim this job, and make sure nobody else does
168 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
170 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
187 $Job->{'state'} = 'Running';
189 $Job = api_call("jobs/create", job => $Job);
191 $job_id = $Job->{'uuid'};
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 Log (undef, "check slurm allocation");
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209 push @sinfo, "$localcpus localhost";
211 if (exists $ENV{SLURM_NODELIST})
213 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 my ($ncpus, $slurm_nodelist) = split;
218 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
221 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
224 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
227 foreach (split (",", $ranges))
240 push @nodelist, map {
242 $n =~ s/\[[-,\d]+\]/$_/;
249 push @nodelist, $nodelist;
252 foreach my $nodename (@nodelist)
254 Log (undef, "node $nodename - $ncpus slots");
255 my $node = { name => $nodename,
259 foreach my $cpu (1..$ncpus)
261 push @slot, { node => $node,
265 push @node, @nodelist;
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
276 $Job->update_attributes(
277 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
313 if (defined $Job->{thawedfromkey})
315 thaw ($Job->{thawedfromkey});
319 my $first_task = api_call("job_tasks/create", job_task => {
320 'job_uuid' => $Job->{'uuid'},
325 push @jobstep, { 'level' => 0,
327 'arvados_task' => $first_task,
329 push @jobstep_todo, 0;
335 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 $build_script = <DATA>;
344 my $nodelist = join(",", @node);
346 if (!defined $no_clear_tmp) {
347 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
348 Log (undef, "Clean work dirs");
350 my $cleanpid = fork();
353 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
354 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
359 last if $cleanpid == waitpid (-1, WNOHANG);
360 freeze_if_want_freeze ($cleanpid);
361 select (undef, undef, undef, 0.1);
363 Log (undef, "Cleanup command exited ".exit_status_s($?));
368 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
369 # If script_version looks like an absolute path, *and* the --git-dir
370 # argument was not given -- which implies we were not invoked by
371 # crunch-dispatch -- we will use the given path as a working
372 # directory instead of resolving script_version to a git commit (or
373 # doing anything else with git).
374 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
378 # Resolve the given script_version to a git commit sha1. Also, if
379 # the repository is remote, clone it into our local filesystem: this
380 # ensures "git archive" will work, and is necessary to reliably
381 # resolve a symbolic script_version like "master^".
382 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
384 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
386 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
388 # If we're running under crunch-dispatch, it will have already
389 # pulled the appropriate source tree into its own repository, and
390 # given us that repo's path as $git_dir.
392 # If we're running a "local" job, we might have to fetch content
393 # from a remote repository.
395 # (Currently crunch-dispatch gives a local path with --git-dir, but
396 # we might as well accept URLs there too in case it changes its
398 my $repo = $git_dir || $Job->{'repository'};
400 # Repository can be remote or local. If remote, we'll need to fetch it
401 # to a local dir before doing `git log` et al.
404 if ($repo =~ m{://|^[^/]*:}) {
405 # $repo is a git url we can clone, like git:// or https:// or
406 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
407 # not recognized here because distinguishing that from a local
408 # path is too fragile. If you really need something strange here,
409 # use the ssh:// form.
410 $repo_location = 'remote';
411 } elsif ($repo =~ m{^\.*/}) {
412 # $repo is a local path to a git index. We'll also resolve ../foo
413 # to ../foo/.git if the latter is a directory. To help
414 # disambiguate local paths from named hosted repositories, this
415 # form must be given as ./ or ../ if it's a relative path.
416 if (-d "$repo/.git") {
417 $repo = "$repo/.git";
419 $repo_location = 'local';
421 # $repo is none of the above. It must be the name of a hosted
423 my $arv_repo_list = api_call("repositories/list",
424 'filters' => [['name','=',$repo]]);
425 my @repos_found = @{$arv_repo_list->{'items'}};
426 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
428 Log(undef, "Repository '$repo' -> "
429 . join(", ", map { $_->{'uuid'} } @repos_found));
432 croak("Error: Found $n_found repositories with name '$repo'.");
434 $repo = $repos_found[0]->{'fetch_url'};
435 $repo_location = 'remote';
437 Log(undef, "Using $repo_location repository '$repo'");
438 $ENV{"CRUNCH_SRC_URL"} = $repo;
440 # Resolve given script_version (we'll call that $treeish here) to a
441 # commit sha1 ($commit).
442 my $treeish = $Job->{'script_version'};
444 if ($repo_location eq 'remote') {
445 # We minimize excess object-fetching by re-using the same bare
446 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
447 # just keep adding remotes to it as needed.
448 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
449 my $gitcmd = "git --git-dir=\Q$local_repo\E";
451 # Set up our local repo for caching remote objects, making
453 if (!-d $local_repo) {
454 make_path($local_repo) or croak("Error: could not create $local_repo");
456 # This works (exits 0 and doesn't delete fetched objects) even
457 # if $local_repo is already initialized:
458 `$gitcmd init --bare`;
460 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
463 # If $treeish looks like a hash (or abbrev hash) we look it up in
464 # our local cache first, since that's cheaper. (We don't want to
465 # do that with tags/branches though -- those change over time, so
466 # they should always be resolved by the remote repo.)
467 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
468 # Hide stderr because it's normal for this to fail:
469 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
471 # Careful not to resolve a branch named abcdeff to commit 1234567:
472 $sha1 =~ /^$treeish/ &&
473 $sha1 =~ /^([0-9a-f]{40})$/s) {
475 Log(undef, "Commit $commit already present in $local_repo");
479 if (!defined $commit) {
480 # If $treeish isn't just a hash or abbrev hash, or isn't here
481 # yet, we need to fetch the remote to resolve it correctly.
483 # First, remove all local heads. This prevents a name that does
484 # not exist on the remote from resolving to (or colliding with)
485 # a previously fetched branch or tag (possibly from a different
487 remove_tree("$local_repo/refs/heads", {keep_root => 1});
489 Log(undef, "Fetching objects from $repo to $local_repo");
490 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
492 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
496 # Now that the data is all here, we will use our local repo for
497 # the rest of our git activities.
501 my $gitcmd = "git --git-dir=\Q$repo\E";
502 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
503 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
504 croak("`$gitcmd rev-list` exited "
506 .", '$treeish' not found. Giving up.");
509 Log(undef, "Version $treeish is commit $commit");
511 if ($commit ne $Job->{'script_version'}) {
512 # Record the real commit id in the database, frozentokey, logs,
513 # etc. -- instead of an abbreviation or a branch name which can
514 # become ambiguous or point to a different commit in the future.
515 if (!$Job->update_attributes('script_version' => $commit)) {
516 croak("Error: failed to update job's script_version attribute");
520 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
521 $git_archive = `$gitcmd archive ''\Q$commit\E`;
523 croak("Error: $gitcmd archive exited ".exit_status_s($?));
527 if (!defined $git_archive) {
528 Log(undef, "Skip install phase (no git archive)");
530 Log(undef, "Warning: This probably means workers have no source tree!");
534 Log(undef, "Run install script on all workers");
536 my @srunargs = ("srun",
537 "--nodelist=$nodelist",
538 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
539 my @execargs = ("sh", "-c",
540 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
542 my $installpid = fork();
543 if ($installpid == 0)
545 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
550 last if $installpid == waitpid (-1, WNOHANG);
551 freeze_if_want_freeze ($installpid);
552 select (undef, undef, undef, 0.1);
554 my $install_exited = $?;
555 Log (undef, "Install script exited ".exit_status_s($install_exited));
556 exit (1) if $install_exited != 0;
561 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
562 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
565 # If this job requires a Docker image, install that.
566 my $docker_bin = "/usr/bin/docker.io";
567 my ($docker_locator, $docker_stream, $docker_hash);
568 if ($docker_locator = $Job->{docker_image_locator}) {
569 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
572 croak("No Docker image hash found from locator $docker_locator");
574 $docker_stream =~ s/^\.//;
575 my $docker_install_script = qq{
576 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
577 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
580 my $docker_pid = fork();
581 if ($docker_pid == 0)
583 srun (["srun", "--nodelist=" . join(',', @node)],
584 ["/bin/sh", "-ec", $docker_install_script]);
589 last if $docker_pid == waitpid (-1, WNOHANG);
590 freeze_if_want_freeze ($docker_pid);
591 select (undef, undef, undef, 0.1);
595 croak("Installing Docker image from $docker_locator exited "
600 foreach (qw (script script_version script_parameters runtime_constraints))
604 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
606 foreach (split (/\n/, $Job->{knobs}))
608 Log (undef, "knob " . $_);
613 $main::success = undef;
619 my $thisround_succeeded = 0;
620 my $thisround_failed = 0;
621 my $thisround_failed_multiple = 0;
623 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
624 or $a <=> $b } @jobstep_todo;
625 my $level = $jobstep[$jobstep_todo[0]]->{level};
626 Log (undef, "start level $level");
631 my @freeslot = (0..$#slot);
634 my $progress_is_dirty = 1;
635 my $progress_stats_updated = 0;
637 update_progress_stats();
642 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
644 my $id = $jobstep_todo[$todo_ptr];
645 my $Jobstep = $jobstep[$id];
646 if ($Jobstep->{level} != $level)
651 pipe $reader{$id}, "writer" or croak ($!);
652 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
653 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
655 my $childslot = $freeslot[0];
656 my $childnode = $slot[$childslot]->{node};
657 my $childslotname = join (".",
658 $slot[$childslot]->{node}->{name},
659 $slot[$childslot]->{cpu});
660 my $childpid = fork();
663 $SIG{'INT'} = 'DEFAULT';
664 $SIG{'QUIT'} = 'DEFAULT';
665 $SIG{'TERM'} = 'DEFAULT';
667 foreach (values (%reader))
671 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
672 open(STDOUT,">&writer");
673 open(STDERR,">&writer");
678 delete $ENV{"GNUPGHOME"};
679 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
680 $ENV{"TASK_QSEQUENCE"} = $id;
681 $ENV{"TASK_SEQUENCE"} = $level;
682 $ENV{"JOB_SCRIPT"} = $Job->{script};
683 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
684 $param =~ tr/a-z/A-Z/;
685 $ENV{"JOB_PARAMETER_$param"} = $value;
687 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
688 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
689 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
690 $ENV{"HOME"} = $ENV{"TASK_WORK"};
691 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
692 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
693 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
694 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
700 "--nodelist=".$childnode->{name},
701 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
702 "--job-name=$job_id.$id.$$",
705 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
706 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
707 ."&& cd $ENV{CRUNCH_TMP} ";
708 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
711 my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
712 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
713 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
715 # Dynamically configure the container to use the host system as its
716 # DNS server. Get the host's global addresses from the ip command,
717 # and turn them into docker --dns options using gawk.
719 q{$(ip -o address show scope global |
720 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
722 # The source tree and $destdir directory (which we have
723 # installed on the worker host) are available in the container,
724 # under the same path.
725 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
726 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
728 # Currently, we make arv-mount's mount point appear at /keep
729 # inside the container (instead of using the same path as the
730 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
731 # crunch scripts and utilities must not rely on this. They must
732 # use $TASK_KEEPMOUNT.
733 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
734 $ENV{TASK_KEEPMOUNT} = "/keep";
736 # TASK_WORK is almost exactly like a docker data volume: it
737 # starts out empty, is writable, and persists until no
738 # containers use it any more. We don't use --volumes-from to
739 # share it with other containers: it is only accessible to this
740 # task, and it goes away when this task stops.
742 # However, a docker data volume is writable only by root unless
743 # the mount point already happens to exist in the container with
744 # different permissions. Therefore, we [1] assume /tmp already
745 # exists in the image and is writable by the crunch user; [2]
746 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
747 # writable if they are created by docker while setting up the
748 # other --volumes); and [3] create $TASK_WORK inside the
749 # container using $build_script.
750 $command .= "--volume=/tmp ";
751 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
752 $ENV{"HOME"} = $ENV{"TASK_WORK"};
753 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
755 # TODO: Share a single JOB_WORK volume across all task
756 # containers on a given worker node, and delete it when the job
757 # ends (and, in case that doesn't work, when the next job
760 # For now, use the same approach as TASK_WORK above.
761 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
763 while (my ($env_key, $env_val) = each %ENV)
765 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
766 $command .= "--env=\Q$env_key=$env_val\E ";
769 $command .= "--env=\QHOME=$ENV{HOME}\E ";
770 $command .= "\Q$docker_hash\E ";
771 $command .= "stdbuf --output=0 --error=0 ";
772 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
775 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
776 $command .= "stdbuf --output=0 --error=0 ";
777 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
780 my @execargs = ('bash', '-c', $command);
781 srun (\@srunargs, \@execargs, undef, $build_script);
782 # exec() failed, we assume nothing happened.
783 die "srun() failed on build script\n";
786 if (!defined $childpid)
793 $proc{$childpid} = { jobstep => $id,
796 jobstepname => "$job_id.$id.$childpid",
798 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
799 $slot[$childslot]->{pid} = $childpid;
801 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
802 Log ($id, "child $childpid started on $childslotname");
803 $Jobstep->{starttime} = time;
804 $Jobstep->{node} = $childnode->{name};
805 $Jobstep->{slotindex} = $childslot;
806 delete $Jobstep->{stderr};
807 delete $Jobstep->{finishtime};
809 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
810 $Jobstep->{'arvados_task'}->save;
812 splice @jobstep_todo, $todo_ptr, 1;
815 $progress_is_dirty = 1;
819 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
821 last THISROUND if $main::please_freeze;
822 if ($main::please_info)
824 $main::please_info = 0;
828 update_progress_stats();
835 check_refresh_wanted();
837 update_progress_stats();
838 select (undef, undef, undef, 0.1);
840 elsif (time - $progress_stats_updated >= 30)
842 update_progress_stats();
844 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
845 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
847 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
848 .($thisround_failed+$thisround_succeeded)
849 .") -- giving up on this round";
850 Log (undef, $message);
854 # move slots from freeslot to holdslot (or back to freeslot) if necessary
855 for (my $i=$#freeslot; $i>=0; $i--) {
856 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
857 push @holdslot, (splice @freeslot, $i, 1);
860 for (my $i=$#holdslot; $i>=0; $i--) {
861 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
862 push @freeslot, (splice @holdslot, $i, 1);
866 # give up if no nodes are succeeding
867 if (!grep { $_->{node}->{losing_streak} == 0 &&
868 $_->{node}->{hold_count} < 4 } @slot) {
869 my $message = "Every node has failed -- giving up on this round";
870 Log (undef, $message);
877 push @freeslot, splice @holdslot;
878 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
881 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
884 if ($main::please_continue) {
885 $main::please_continue = 0;
888 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
892 check_refresh_wanted();
894 update_progress_stats();
895 select (undef, undef, undef, 0.1);
896 killem (keys %proc) if $main::please_freeze;
900 update_progress_stats();
901 freeze_if_want_freeze();
904 if (!defined $main::success)
907 $thisround_succeeded == 0 &&
908 ($thisround_failed == 0 || $thisround_failed > 4))
910 my $message = "stop because $thisround_failed tasks failed and none succeeded";
911 Log (undef, $message);
920 goto ONELEVEL if !defined $main::success;
923 release_allocation();
925 my $collated_output = &collate_output();
927 if (!$collated_output) {
928 Log(undef, "output undef");
932 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
933 or die "failed to get collated manifest: $!";
934 my $orig_manifest_text = '';
935 while (my $manifest_line = <$orig_manifest>) {
936 $orig_manifest_text .= $manifest_line;
938 my $output = api_call("collections/create", collection => {
939 'manifest_text' => $orig_manifest_text});
940 Log(undef, "output uuid " . $output->{uuid});
941 Log(undef, "output hash " . $output->{portable_data_hash});
942 $Job->update_attributes('output' => $output->{portable_data_hash});
945 Log (undef, "Failed to register output manifest: $@");
949 Log (undef, "finish");
954 if ($collated_output && $main::success) {
955 $final_state = 'Complete';
957 $final_state = 'Failed';
959 $Job->update_attributes('state' => $final_state);
961 exit (($final_state eq 'Complete') ? 0 : 1);
965 sub update_progress_stats
967 $progress_stats_updated = time;
968 return if !$progress_is_dirty;
969 my ($todo, $done, $running) = (scalar @jobstep_todo,
970 scalar @jobstep_done,
971 scalar @slot - scalar @freeslot - scalar @holdslot);
972 $Job->{'tasks_summary'} ||= {};
973 $Job->{'tasks_summary'}->{'todo'} = $todo;
974 $Job->{'tasks_summary'}->{'done'} = $done;
975 $Job->{'tasks_summary'}->{'running'} = $running;
976 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
977 Log (undef, "status: $done done, $running running, $todo todo");
978 $progress_is_dirty = 0;
985 my $pid = waitpid (-1, WNOHANG);
986 return 0 if $pid <= 0;
988 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
990 . $slot[$proc{$pid}->{slot}]->{cpu});
991 my $jobstepid = $proc{$pid}->{jobstep};
992 my $elapsed = time - $proc{$pid}->{time};
993 my $Jobstep = $jobstep[$jobstepid];
995 my $childstatus = $?;
996 my $exitvalue = $childstatus >> 8;
997 my $exitinfo = "exit ".exit_status_s($childstatus);
998 $Jobstep->{'arvados_task'}->reload;
999 my $task_success = $Jobstep->{'arvados_task'}->{success};
1001 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1003 if (!defined $task_success) {
1004 # task did not indicate one way or the other --> fail
1005 $Jobstep->{'arvados_task'}->{success} = 0;
1006 $Jobstep->{'arvados_task'}->save;
1013 $temporary_fail ||= $Jobstep->{node_fail};
1014 $temporary_fail ||= ($exitvalue == 111);
1016 ++$thisround_failed;
1017 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1019 # Check for signs of a failed or misconfigured node
1020 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1021 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1022 # Don't count this against jobstep failure thresholds if this
1023 # node is already suspected faulty and srun exited quickly
1024 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1026 Log ($jobstepid, "blaming failure on suspect node " .
1027 $slot[$proc{$pid}->{slot}]->{node}->{name});
1028 $temporary_fail ||= 1;
1030 ban_node_by_slot($proc{$pid}->{slot});
1033 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1034 ++$Jobstep->{'failures'},
1035 $temporary_fail ? 'temporary ' : 'permanent',
1038 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1039 # Give up on this task, and the whole job
1041 $main::please_freeze = 1;
1043 # Put this task back on the todo queue
1044 push @jobstep_todo, $jobstepid;
1045 $Job->{'tasks_summary'}->{'failed'}++;
1049 ++$thisround_succeeded;
1050 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1051 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1052 push @jobstep_done, $jobstepid;
1053 Log ($jobstepid, "success in $elapsed seconds");
1055 $Jobstep->{exitcode} = $childstatus;
1056 $Jobstep->{finishtime} = time;
1057 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1058 $Jobstep->{'arvados_task'}->save;
1059 process_stderr ($jobstepid, $task_success);
1060 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1062 close $reader{$jobstepid};
1063 delete $reader{$jobstepid};
1064 delete $slot[$proc{$pid}->{slot}]->{pid};
1065 push @freeslot, $proc{$pid}->{slot};
1068 if ($task_success) {
1070 my $newtask_list = [];
1071 my $newtask_results;
1073 $newtask_results = api_call(
1076 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1078 'order' => 'qsequence',
1079 'offset' => scalar(@$newtask_list),
1081 push(@$newtask_list, @{$newtask_results->{items}});
1082 } while (@{$newtask_results->{items}});
1083 foreach my $arvados_task (@$newtask_list) {
1085 'level' => $arvados_task->{'sequence'},
1087 'arvados_task' => $arvados_task
1089 push @jobstep, $jobstep;
1090 push @jobstep_todo, $#jobstep;
1094 $progress_is_dirty = 1;
1098 sub check_refresh_wanted
1100 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1101 if (@stat && $stat[9] > $latest_refresh) {
1102 $latest_refresh = scalar time;
1103 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1104 for my $attr ('cancelled_at',
1105 'cancelled_by_user_uuid',
1106 'cancelled_by_client_uuid',
1108 $Job->{$attr} = $Job2->{$attr};
1110 if ($Job->{'state'} ne "Running") {
1111 if ($Job->{'state'} eq "Cancelled") {
1112 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1114 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1117 $main::please_freeze = 1;
1124 # return if the kill list was checked <4 seconds ago
1125 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1129 $squeue_kill_checked = time;
1131 # use killem() on procs whose killtime is reached
1134 if (exists $proc{$_}->{killtime}
1135 && $proc{$_}->{killtime} <= time)
1141 # return if the squeue was checked <60 seconds ago
1142 if (defined $squeue_checked && $squeue_checked > time - 60)
1146 $squeue_checked = time;
1150 # here is an opportunity to check for mysterious problems with local procs
1154 # get a list of steps still running
1155 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1157 if ($squeue[-1] ne "ok")
1163 # which of my jobsteps are running, according to squeue?
1167 if (/^(\d+)\.(\d+) (\S+)/)
1169 if ($1 eq $ENV{SLURM_JOBID})
1176 # which of my active child procs (>60s old) were not mentioned by squeue?
1177 foreach (keys %proc)
1179 if ($proc{$_}->{time} < time - 60
1180 && !exists $ok{$proc{$_}->{jobstepname}}
1181 && !exists $proc{$_}->{killtime})
1183 # kill this proc if it hasn't exited in 30 seconds
1184 $proc{$_}->{killtime} = time + 30;
1190 sub release_allocation
1194 Log (undef, "release job allocation");
1195 system "scancel $ENV{SLURM_JOBID}";
1203 foreach my $job (keys %reader)
1206 while (0 < sysread ($reader{$job}, $buf, 8192))
1208 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1209 $jobstep[$job]->{stderr} .= $buf;
1210 preprocess_stderr ($job);
1211 if (length ($jobstep[$job]->{stderr}) > 16384)
1213 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1222 sub preprocess_stderr
1226 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1228 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1229 Log ($job, "stderr $line");
1230 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1232 $main::please_freeze = 1;
1234 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1235 $jobstep[$job]->{node_fail} = 1;
1236 ban_node_by_slot($jobstep[$job]->{slotindex});
1245 my $task_success = shift;
1246 preprocess_stderr ($job);
1249 Log ($job, "stderr $_");
1250 } split ("\n", $jobstep[$job]->{stderr});
1256 my ($keep, $child_out, $output_block);
1258 my $cmd = "arv-get \Q$hash\E";
1259 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1263 my $bytes = sysread($keep, $buf, 1024 * 1024);
1264 if (!defined $bytes) {
1265 die "reading from arv-get: $!";
1266 } elsif ($bytes == 0) {
1267 # sysread returns 0 at the end of the pipe.
1270 # some bytes were read into buf.
1271 $output_block .= $buf;
1275 return $output_block;
1280 Log (undef, "collate");
1282 my ($child_out, $child_in);
1283 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1284 '--retries', retry_count());
1288 next if (!exists $_->{'arvados_task'}->{'output'} ||
1289 !$_->{'arvados_task'}->{'success'});
1290 my $output = $_->{'arvados_task'}->{output};
1291 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1293 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1294 print $child_in $output;
1296 elsif (@jobstep == 1)
1298 $joboutput = $output;
1301 elsif (defined (my $outblock = fetch_block ($output)))
1303 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1304 print $child_in $outblock;
1308 Log (undef, "XXX fetch_block($output) failed XXX");
1314 if (!defined $joboutput) {
1315 my $s = IO::Select->new($child_out);
1316 if ($s->can_read(120)) {
1317 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1319 # TODO: Ensure exit status == 0.
1321 Log (undef, "timed out reading from 'arv-put'");
1324 # TODO: kill $pid instead of waiting, now that we've decided to
1325 # ignore further output.
1336 my $sig = 2; # SIGINT first
1337 if (exists $proc{$_}->{"sent_$sig"} &&
1338 time - $proc{$_}->{"sent_$sig"} > 4)
1340 $sig = 15; # SIGTERM if SIGINT doesn't work
1342 if (exists $proc{$_}->{"sent_$sig"} &&
1343 time - $proc{$_}->{"sent_$sig"} > 4)
1345 $sig = 9; # SIGKILL if SIGTERM doesn't work
1347 if (!exists $proc{$_}->{"sent_$sig"})
1349 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1351 select (undef, undef, undef, 0.1);
1354 kill $sig, $_; # srun wants two SIGINT to really interrupt
1356 $proc{$_}->{"sent_$sig"} = time;
1357 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1367 vec($bits,fileno($_),1) = 1;
1373 # Send log output to Keep via arv-put.
1375 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1376 # $log_pipe_pid is the pid of the arv-put subprocess.
1378 # The only functions that should access these variables directly are:
1380 # log_writer_start($logfilename)
1381 # Starts an arv-put pipe, reading data on stdin and writing it to
1382 # a $logfilename file in an output collection.
1384 # log_writer_send($txt)
1385 # Writes $txt to the output log collection.
1387 # log_writer_finish()
1388 # Closes the arv-put pipe and returns the output that it produces.
1390 # log_writer_is_active()
1391 # Returns a true value if there is currently a live arv-put
1392 # process, false otherwise.
1394 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1396 sub log_writer_start($)
1398 my $logfilename = shift;
1399 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1400 'arv-put', '--portable-data-hash',
1402 '--filename', $logfilename,
1406 sub log_writer_send($)
1409 print $log_pipe_in $txt;
1412 sub log_writer_finish()
1414 return unless $log_pipe_pid;
1416 close($log_pipe_in);
1419 my $s = IO::Select->new($log_pipe_out);
1420 if ($s->can_read(120)) {
1421 sysread($log_pipe_out, $arv_put_output, 1024);
1422 chomp($arv_put_output);
1424 Log (undef, "timed out reading from 'arv-put'");
1427 waitpid($log_pipe_pid, 0);
1428 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1430 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1433 return $arv_put_output;
1436 sub log_writer_is_active() {
1437 return $log_pipe_pid;
1440 sub Log # ($jobstep_id, $logmessage)
1442 if ($_[1] =~ /\n/) {
1443 for my $line (split (/\n/, $_[1])) {
1448 my $fh = select STDERR; $|=1; select $fh;
1449 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1450 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1453 if (log_writer_is_active() || -t STDERR) {
1454 my @gmtime = gmtime;
1455 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1456 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1458 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1460 if (log_writer_is_active()) {
1461 log_writer_send($datetime . " " . $message);
1468 my ($package, $file, $line) = caller;
1469 my $message = "@_ at $file line $line\n";
1470 Log (undef, $message);
1471 freeze() if @jobstep_todo;
1472 collate_output() if @jobstep_todo;
1482 if ($Job->{'state'} eq 'Cancelled') {
1483 $Job->update_attributes('finished_at' => scalar gmtime);
1485 $Job->update_attributes('state' => 'Failed');
1492 my $justcheckpoint = shift; # false if this will be the last meta saved
1493 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1494 return unless log_writer_is_active();
1496 my $loglocator = log_writer_finish();
1497 Log (undef, "log manifest is $loglocator");
1498 $Job->{'log'} = $loglocator;
1499 $Job->update_attributes('log', $loglocator);
1503 sub freeze_if_want_freeze
1505 if ($main::please_freeze)
1507 release_allocation();
1510 # kill some srun procs before freeze+stop
1511 map { $proc{$_} = {} } @_;
1514 killem (keys %proc);
1515 select (undef, undef, undef, 0.1);
1517 while (($died = waitpid (-1, WNOHANG)) > 0)
1519 delete $proc{$died};
1534 Log (undef, "Freeze not implemented");
1541 croak ("Thaw not implemented");
1557 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1564 my $srunargs = shift;
1565 my $execargs = shift;
1566 my $opts = shift || {};
1568 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1570 $Data::Dumper::Terse = 1;
1571 $Data::Dumper::Indent = 0;
1572 my $show_cmd = Dumper($args);
1573 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1574 $show_cmd =~ s/\n/ /g;
1575 warn "starting: $show_cmd\n";
1577 if (defined $stdin) {
1578 my $child = open STDIN, "-|";
1579 defined $child or die "no fork: $!";
1581 print $stdin or die $!;
1582 close STDOUT or die $!;
1587 return system (@$args) if $opts->{fork};
1590 warn "ENV size is ".length(join(" ",%ENV));
1591 die "exec failed: $!: @$args";
1595 sub ban_node_by_slot {
1596 # Don't start any new jobsteps on this node for 60 seconds
1598 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1599 $slot[$slotid]->{node}->{hold_count}++;
1600 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1605 my ($lockfile, $error_message) = @_;
1606 open L, ">", $lockfile or croak("$lockfile: $!");
1607 if (!flock L, LOCK_EX|LOCK_NB) {
1608 croak("Can't lock $lockfile: $error_message\n");
1612 sub find_docker_image {
1613 # Given a Keep locator, check to see if it contains a Docker image.
1614 # If so, return its stream name and Docker hash.
1615 # If not, return undef for both values.
1616 my $locator = shift;
1617 my ($streamname, $filename);
1618 my $image = api_call("collections/get", uuid => $locator);
1620 foreach my $line (split(/\n/, $image->{manifest_text})) {
1621 my @tokens = split(/\s+/, $line);
1623 $streamname = shift(@tokens);
1624 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1625 if (defined($filename)) {
1626 return (undef, undef); # More than one file in the Collection.
1628 $filename = (split(/:/, $filedata, 3))[2];
1633 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1634 return ($streamname, $1);
1636 return (undef, undef);
1641 # Calculate the number of times an operation should be retried,
1642 # assuming exponential backoff, and that we're willing to retry as
1643 # long as tasks have been running. Enforce a minimum of 3 retries.
1644 my ($starttime, $endtime, $timediff, $retries);
1646 $starttime = $jobstep[0]->{starttime};
1647 $endtime = $jobstep[-1]->{finishtime};
1649 if (!defined($starttime)) {
1651 } elsif (!defined($endtime)) {
1652 $timediff = time - $starttime;
1654 $timediff = ($endtime - $starttime) - (time - $endtime);
1656 if ($timediff > 0) {
1657 $retries = int(log($timediff) / log(2));
1659 $retries = 1; # Use the minimum.
1661 return ($retries > 3) ? $retries : 3;
1665 # Pass in two function references.
1666 # This method will be called with the remaining arguments.
1667 # If it dies, retry it with exponential backoff until it succeeds,
1668 # or until the current retry_count is exhausted. After each failure
1669 # that can be retried, the second function will be called with
1670 # the current try count (0-based), next try time, and error message.
1671 my $operation = shift;
1672 my $retry_callback = shift;
1673 my $retries = retry_count();
1674 foreach my $try_count (0..$retries) {
1675 my $next_try = time + (2 ** $try_count);
1676 my $result = eval { $operation->(@_); };
1679 } elsif ($try_count < $retries) {
1680 $retry_callback->($try_count, $next_try, $@);
1681 my $sleep_time = $next_try - time;
1682 sleep($sleep_time) if ($sleep_time > 0);
1685 # Ensure the error message ends in a newline, so Perl doesn't add
1686 # retry_op's line number to it.
1692 # Pass in a /-separated API method name, and arguments for it.
1693 # This function will call that method, retrying as needed until
1694 # the current retry_count is exhausted, with a log on the first failure.
1695 my $method_name = shift;
1696 my $log_api_retry = sub {
1697 my ($try_count, $next_try_at, $errmsg) = @_;
1698 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1699 $errmsg =~ s/\s/ /g;
1700 $errmsg =~ s/\s+$//;
1702 if ($next_try_at < time) {
1703 $retry_msg = "Retrying.";
1705 my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
1706 $retry_msg = "Retrying at $next_try_fmt.";
1708 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1711 foreach my $key (split(/\//, $method_name)) {
1712 $method = $method->{$key};
1714 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1718 # Given a $?, return a human-readable exit code string like "0" or
1719 # "1" or "0 with signal 1" or "1 with signal 11".
1720 my $exitcode = shift;
1721 my $s = $exitcode >> 8;
1722 if ($exitcode & 0x7f) {
1723 $s .= " with signal " . ($exitcode & 0x7f);
1725 if ($exitcode & 0x80) {
1726 $s .= " with core dump";
1734 # checkout-and-build
1737 use File::Path qw( make_path remove_tree );
1739 my $destdir = $ENV{"CRUNCH_SRC"};
1740 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1741 my $repo = $ENV{"CRUNCH_SRC_URL"};
1742 my $job_work = $ENV{"JOB_WORK"};
1743 my $task_work = $ENV{"TASK_WORK"};
1745 for my $dir ($destdir, $job_work, $task_work) {
1748 -e $dir or die "Failed to create temporary directory ($dir): $!";
1753 remove_tree($task_work, {keep_root => 1});
1756 my @git_archive_data = <DATA>;
1757 if (!@git_archive_data) {
1758 # Nothing to extract -> nothing to install.
1759 run_argv_and_exit();
1762 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1764 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1765 # This version already installed -> nothing to do.
1766 run_argv_and_exit();
1769 unlink "$destdir.commit";
1770 open STDERR_ORIG, ">&STDERR";
1771 open STDOUT, ">", "$destdir.log";
1772 open STDERR, ">&STDOUT";
1775 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1776 print TARX @git_archive_data;
1778 die "'tar -C $destdir -xf -' exited $?: $!";
1782 chomp ($pwd = `pwd`);
1783 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1786 for my $src_path ("$destdir/arvados/sdk/python") {
1788 shell_or_die ("virtualenv", $install_dir);
1789 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1793 if (-e "$destdir/crunch_scripts/install") {
1794 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1795 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1797 shell_or_die ("./tests/autotests.sh", $install_dir);
1798 } elsif (-e "./install.sh") {
1799 shell_or_die ("./install.sh", $install_dir);
1803 unlink "$destdir.commit.new";
1804 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1805 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1810 run_argv_and_exit();
1812 sub run_argv_and_exit
1816 die "Cannot exec `@ARGV`: $!";
1824 if ($ENV{"DEBUG"}) {
1825 print STDERR "@_\n";
1827 if (system (@_) != 0) {
1829 my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1830 open STDERR, ">&STDERR_ORIG";
1831 system ("cat $destdir.log >&2");
1832 die "@_ failed ($err): $exitstatus";