2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105 if ($ENV{"USER"} ne "crunch" && $< != 0) {
106 # use a tmp dir unique for my uid
107 $ENV{"CRUNCH_TMP"} .= "-$<";
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
127 GetOptions('force-unlock' => \$force_unlock,
128 'git-dir=s' => \$git_dir,
129 'job=s' => \$jobspec,
130 'job-api-token=s' => \$job_api_token,
131 'no-clear-tmp' => \$no_clear_tmp,
132 'resume-stash=s' => \$resume_stash,
135 if (defined $job_api_token) {
136 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
145 $main::ENV{CRUNCH_DEBUG} = 1;
149 $main::ENV{CRUNCH_DEBUG} = 0;
154 my $arv = Arvados->new('apiVersion' => 'v1');
162 my $User = api_call("users/current");
164 if ($jobspec =~ /^[-a-z\d]+$/)
166 # $jobspec is an Arvados UUID, not a JSON job specification
167 $Job = api_call("jobs/get", uuid => $jobspec);
168 if (!$force_unlock) {
169 # Claim this job, and make sure nobody else does
170 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
172 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
179 $Job = JSON::decode_json($jobspec);
183 map { croak ("No $_ specified") unless $Job->{$_} }
184 qw(script script_version script_parameters);
187 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188 $Job->{'started_at'} = gmtime;
189 $Job->{'state'} = 'Running';
191 $Job = api_call("jobs/create", job => $Job);
193 $job_id = $Job->{'uuid'};
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
204 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205 chomp($gem_versions);
206 chop($gem_versions); # Closing parentheses
211 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
213 Log (undef, "check slurm allocation");
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
220 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221 push @sinfo, "$localcpus localhost";
223 if (exists $ENV{SLURM_NODELIST})
225 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
229 my ($ncpus, $slurm_nodelist) = split;
230 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
233 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
236 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
239 foreach (split (",", $ranges))
252 push @nodelist, map {
254 $n =~ s/\[[-,\d]+\]/$_/;
261 push @nodelist, $nodelist;
264 foreach my $nodename (@nodelist)
266 Log (undef, "node $nodename - $ncpus slots");
267 my $node = { name => $nodename,
271 foreach my $cpu (1..$ncpus)
273 push @slot, { node => $node,
277 push @node, @nodelist;
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
288 $Job->update_attributes(
289 'tasks_summary' => { 'failed' => 0,
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
324 if (defined $Job->{thawedfromkey})
326 thaw ($Job->{thawedfromkey});
330 my $first_task = api_call("job_tasks/create", job_task => {
331 'job_uuid' => $Job->{'uuid'},
336 push @jobstep, { 'level' => 0,
338 'arvados_task' => $first_task,
340 push @jobstep_todo, 0;
346 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
353 if (!defined $no_clear_tmp) {
354 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355 Log (undef, "Clean work dirs");
357 my $cleanpid = fork();
360 # Find FUSE mounts that look like Keep mounts (the mount path has the
361 # word "keep") and unmount them. Then clean up work directories.
362 # TODO: When #5036 is done and widely deployed, we can get rid of the
363 # regular expression and just unmount everything with type fuse.keep.
364 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365 ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
370 last if $cleanpid == waitpid (-1, WNOHANG);
371 freeze_if_want_freeze ($cleanpid);
372 select (undef, undef, undef, 0.1);
374 Log (undef, "Cleanup command exited ".exit_status_s($?));
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
384 croak("No Docker image hash found from locator $docker_locator");
386 $docker_stream =~ s/^\.//;
387 my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
392 my $docker_pid = fork();
393 if ($docker_pid == 0)
395 srun (["srun", "--nodelist=" . join(',', @node)],
396 ["/bin/sh", "-ec", $docker_install_script]);
401 last if $docker_pid == waitpid (-1, WNOHANG);
402 freeze_if_want_freeze ($docker_pid);
403 select (undef, undef, undef, 0.1);
407 croak("Installing Docker image from $docker_locator exited "
411 # Determine whether this version of Docker supports memory+swap limits.
412 srun(["srun", "--nodelist=" . $node[0]],
413 ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
415 $docker_limitmem = ($? == 0);
417 if ($Job->{arvados_sdk_version}) {
418 # The job also specifies an Arvados SDK version. Add the SDKs to the
419 # tar file for the build script to install.
420 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
421 $Job->{arvados_sdk_version}));
422 add_git_archive("git", "--git-dir=$git_dir", "archive",
423 "--prefix=.arvados.sdk/",
424 $Job->{arvados_sdk_version}, "sdk");
428 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
429 # If script_version looks like an absolute path, *and* the --git-dir
430 # argument was not given -- which implies we were not invoked by
431 # crunch-dispatch -- we will use the given path as a working
432 # directory instead of resolving script_version to a git commit (or
433 # doing anything else with git).
434 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
435 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
438 # Resolve the given script_version to a git commit sha1. Also, if
439 # the repository is remote, clone it into our local filesystem: this
440 # ensures "git archive" will work, and is necessary to reliably
441 # resolve a symbolic script_version like "master^".
442 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
444 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
446 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
448 # If we're running under crunch-dispatch, it will have already
449 # pulled the appropriate source tree into its own repository, and
450 # given us that repo's path as $git_dir.
452 # If we're running a "local" job, we might have to fetch content
453 # from a remote repository.
455 # (Currently crunch-dispatch gives a local path with --git-dir, but
456 # we might as well accept URLs there too in case it changes its
458 my $repo = $git_dir || $Job->{'repository'};
460 # Repository can be remote or local. If remote, we'll need to fetch it
461 # to a local dir before doing `git log` et al.
464 if ($repo =~ m{://|^[^/]*:}) {
465 # $repo is a git url we can clone, like git:// or https:// or
466 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
467 # not recognized here because distinguishing that from a local
468 # path is too fragile. If you really need something strange here,
469 # use the ssh:// form.
470 $repo_location = 'remote';
471 } elsif ($repo =~ m{^\.*/}) {
472 # $repo is a local path to a git index. We'll also resolve ../foo
473 # to ../foo/.git if the latter is a directory. To help
474 # disambiguate local paths from named hosted repositories, this
475 # form must be given as ./ or ../ if it's a relative path.
476 if (-d "$repo/.git") {
477 $repo = "$repo/.git";
479 $repo_location = 'local';
481 # $repo is none of the above. It must be the name of a hosted
483 my $arv_repo_list = api_call("repositories/list",
484 'filters' => [['name','=',$repo]]);
485 my @repos_found = @{$arv_repo_list->{'items'}};
486 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
488 Log(undef, "Repository '$repo' -> "
489 . join(", ", map { $_->{'uuid'} } @repos_found));
492 croak("Error: Found $n_found repositories with name '$repo'.");
494 $repo = $repos_found[0]->{'fetch_url'};
495 $repo_location = 'remote';
497 Log(undef, "Using $repo_location repository '$repo'");
498 $ENV{"CRUNCH_SRC_URL"} = $repo;
500 # Resolve given script_version (we'll call that $treeish here) to a
501 # commit sha1 ($commit).
502 my $treeish = $Job->{'script_version'};
504 if ($repo_location eq 'remote') {
505 # We minimize excess object-fetching by re-using the same bare
506 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
507 # just keep adding remotes to it as needed.
508 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
509 my $gitcmd = "git --git-dir=\Q$local_repo\E";
511 # Set up our local repo for caching remote objects, making
513 if (!-d $local_repo) {
514 make_path($local_repo) or croak("Error: could not create $local_repo");
516 # This works (exits 0 and doesn't delete fetched objects) even
517 # if $local_repo is already initialized:
518 `$gitcmd init --bare`;
520 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
523 # If $treeish looks like a hash (or abbrev hash) we look it up in
524 # our local cache first, since that's cheaper. (We don't want to
525 # do that with tags/branches though -- those change over time, so
526 # they should always be resolved by the remote repo.)
527 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
528 # Hide stderr because it's normal for this to fail:
529 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
531 # Careful not to resolve a branch named abcdeff to commit 1234567:
532 $sha1 =~ /^$treeish/ &&
533 $sha1 =~ /^([0-9a-f]{40})$/s) {
535 Log(undef, "Commit $commit already present in $local_repo");
539 if (!defined $commit) {
540 # If $treeish isn't just a hash or abbrev hash, or isn't here
541 # yet, we need to fetch the remote to resolve it correctly.
543 # First, remove all local heads. This prevents a name that does
544 # not exist on the remote from resolving to (or colliding with)
545 # a previously fetched branch or tag (possibly from a different
547 remove_tree("$local_repo/refs/heads", {keep_root => 1});
549 Log(undef, "Fetching objects from $repo to $local_repo");
550 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
552 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
556 # Now that the data is all here, we will use our local repo for
557 # the rest of our git activities.
561 my $gitcmd = "git --git-dir=\Q$repo\E";
562 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
563 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
564 croak("`$gitcmd rev-list` exited "
566 .", '$treeish' not found. Giving up.");
569 Log(undef, "Version $treeish is commit $commit");
571 if ($commit ne $Job->{'script_version'}) {
572 # Record the real commit id in the database, frozentokey, logs,
573 # etc. -- instead of an abbreviation or a branch name which can
574 # become ambiguous or point to a different commit in the future.
575 if (!$Job->update_attributes('script_version' => $commit)) {
576 croak("Error: failed to update job's script_version attribute");
580 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
581 add_git_archive("$gitcmd archive ''\Q$commit\E");
584 my $git_archive = combined_git_archive();
585 if (!defined $git_archive) {
586 Log(undef, "Skip install phase (no git archive)");
588 Log(undef, "Warning: This probably means workers have no source tree!");
592 Log(undef, "Run install script on all workers");
594 my @srunargs = ("srun",
595 "--nodelist=$nodelist",
596 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
597 my @execargs = ("sh", "-c",
598 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
600 my $installpid = fork();
601 if ($installpid == 0)
603 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
608 last if $installpid == waitpid (-1, WNOHANG);
609 freeze_if_want_freeze ($installpid);
610 select (undef, undef, undef, 0.1);
612 my $install_exited = $?;
613 Log (undef, "Install script exited ".exit_status_s($install_exited));
614 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
615 unlink($tar_filename);
617 exit (1) if $install_exited != 0;
620 foreach (qw (script script_version script_parameters runtime_constraints))
624 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
626 foreach (split (/\n/, $Job->{knobs}))
628 Log (undef, "knob " . $_);
633 $main::success = undef;
639 my $thisround_succeeded = 0;
640 my $thisround_failed = 0;
641 my $thisround_failed_multiple = 0;
643 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
644 or $a <=> $b } @jobstep_todo;
645 my $level = $jobstep[$jobstep_todo[0]]->{level};
646 Log (undef, "start level $level");
651 my @freeslot = (0..$#slot);
654 my $progress_is_dirty = 1;
655 my $progress_stats_updated = 0;
657 update_progress_stats();
662 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
664 my $id = $jobstep_todo[$todo_ptr];
665 my $Jobstep = $jobstep[$id];
666 if ($Jobstep->{level} != $level)
671 pipe $reader{$id}, "writer" or croak ($!);
672 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
673 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
675 my $childslot = $freeslot[0];
676 my $childnode = $slot[$childslot]->{node};
677 my $childslotname = join (".",
678 $slot[$childslot]->{node}->{name},
679 $slot[$childslot]->{cpu});
681 my $childpid = fork();
684 $SIG{'INT'} = 'DEFAULT';
685 $SIG{'QUIT'} = 'DEFAULT';
686 $SIG{'TERM'} = 'DEFAULT';
688 foreach (values (%reader))
692 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
693 open(STDOUT,">&writer");
694 open(STDERR,">&writer");
699 delete $ENV{"GNUPGHOME"};
700 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
701 $ENV{"TASK_QSEQUENCE"} = $id;
702 $ENV{"TASK_SEQUENCE"} = $level;
703 $ENV{"JOB_SCRIPT"} = $Job->{script};
704 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
705 $param =~ tr/a-z/A-Z/;
706 $ENV{"JOB_PARAMETER_$param"} = $value;
708 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
709 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
710 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
711 $ENV{"HOME"} = $ENV{"TASK_WORK"};
712 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
713 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
714 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
715 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
721 "--nodelist=".$childnode->{name},
722 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
723 "--job-name=$job_id.$id.$$",
726 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
727 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
728 ."&& cd $ENV{CRUNCH_TMP} "
729 # These environment variables get used explicitly later in
730 # $command. No tool is expected to read these values directly.
731 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
732 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
733 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
734 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
735 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
738 my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
739 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
740 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
741 # We only set memory limits if Docker lets us limit both memory and swap.
742 # Memory limits alone have been supported longer, but subprocesses tend
743 # to get SIGKILL if they exceed that without any swap limit set.
744 # See #5642 for additional background.
745 if ($docker_limitmem) {
746 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
749 # Dynamically configure the container to use the host system as its
750 # DNS server. Get the host's global addresses from the ip command,
751 # and turn them into docker --dns options using gawk.
753 q{$(ip -o address show scope global |
754 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
756 # The source tree and $destdir directory (which we have
757 # installed on the worker host) are available in the container,
758 # under the same path.
759 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
760 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
762 # Currently, we make arv-mount's mount point appear at /keep
763 # inside the container (instead of using the same path as the
764 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
765 # crunch scripts and utilities must not rely on this. They must
766 # use $TASK_KEEPMOUNT.
767 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
768 $ENV{TASK_KEEPMOUNT} = "/keep";
770 # TASK_WORK is almost exactly like a docker data volume: it
771 # starts out empty, is writable, and persists until no
772 # containers use it any more. We don't use --volumes-from to
773 # share it with other containers: it is only accessible to this
774 # task, and it goes away when this task stops.
776 # However, a docker data volume is writable only by root unless
777 # the mount point already happens to exist in the container with
778 # different permissions. Therefore, we [1] assume /tmp already
779 # exists in the image and is writable by the crunch user; [2]
780 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
781 # writable if they are created by docker while setting up the
782 # other --volumes); and [3] create $TASK_WORK inside the
783 # container using $build_script.
784 $command .= "--volume=/tmp ";
785 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
786 $ENV{"HOME"} = $ENV{"TASK_WORK"};
787 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
789 # TODO: Share a single JOB_WORK volume across all task
790 # containers on a given worker node, and delete it when the job
791 # ends (and, in case that doesn't work, when the next job
794 # For now, use the same approach as TASK_WORK above.
795 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
797 while (my ($env_key, $env_val) = each %ENV)
799 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
800 $command .= "--env=\Q$env_key=$env_val\E ";
803 $command .= "--env=\QHOME=$ENV{HOME}\E ";
804 $command .= "\Q$docker_hash\E ";
805 $command .= "stdbuf --output=0 --error=0 ";
806 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
809 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
810 $command .= "stdbuf --output=0 --error=0 ";
811 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
814 my @execargs = ('bash', '-c', $command);
815 srun (\@srunargs, \@execargs, undef, $build_script);
816 # exec() failed, we assume nothing happened.
817 die "srun() failed on build script\n";
820 if (!defined $childpid)
827 $proc{$childpid} = { jobstep => $id,
830 jobstepname => "$job_id.$id.$childpid",
832 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
833 $slot[$childslot]->{pid} = $childpid;
835 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
836 Log ($id, "child $childpid started on $childslotname");
837 $Jobstep->{starttime} = time;
838 $Jobstep->{node} = $childnode->{name};
839 $Jobstep->{slotindex} = $childslot;
840 delete $Jobstep->{stderr};
841 delete $Jobstep->{finishtime};
843 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
844 $Jobstep->{'arvados_task'}->save;
846 splice @jobstep_todo, $todo_ptr, 1;
849 $progress_is_dirty = 1;
853 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
855 last THISROUND if $main::please_freeze || defined($main::success);
856 if ($main::please_info)
858 $main::please_info = 0;
860 create_output_collection();
862 update_progress_stats();
869 check_refresh_wanted();
871 update_progress_stats();
872 select (undef, undef, undef, 0.1);
874 elsif (time - $progress_stats_updated >= 30)
876 update_progress_stats();
878 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
879 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
881 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
882 .($thisround_failed+$thisround_succeeded)
883 .") -- giving up on this round";
884 Log (undef, $message);
888 # move slots from freeslot to holdslot (or back to freeslot) if necessary
889 for (my $i=$#freeslot; $i>=0; $i--) {
890 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
891 push @holdslot, (splice @freeslot, $i, 1);
894 for (my $i=$#holdslot; $i>=0; $i--) {
895 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
896 push @freeslot, (splice @holdslot, $i, 1);
900 # give up if no nodes are succeeding
901 if (!grep { $_->{node}->{losing_streak} == 0 &&
902 $_->{node}->{hold_count} < 4 } @slot) {
903 my $message = "Every node has failed -- giving up on this round";
904 Log (undef, $message);
911 push @freeslot, splice @holdslot;
912 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
915 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
918 if ($main::please_continue) {
919 $main::please_continue = 0;
922 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
926 check_refresh_wanted();
928 update_progress_stats();
929 select (undef, undef, undef, 0.1);
930 killem (keys %proc) if $main::please_freeze;
934 update_progress_stats();
935 freeze_if_want_freeze();
938 if (!defined $main::success)
941 $thisround_succeeded == 0 &&
942 ($thisround_failed == 0 || $thisround_failed > 4))
944 my $message = "stop because $thisround_failed tasks failed and none succeeded";
945 Log (undef, $message);
954 goto ONELEVEL if !defined $main::success;
957 release_allocation();
959 my $collated_output = &create_output_collection();
961 if (!$collated_output) {
962 Log (undef, "Failed to write output collection");
965 Log(undef, "job output $collated_output");
966 $Job->update_attributes('output' => $collated_output);
969 Log (undef, "finish");
974 if ($collated_output && $main::success) {
975 $final_state = 'Complete';
977 $final_state = 'Failed';
979 $Job->update_attributes('state' => $final_state);
981 exit (($final_state eq 'Complete') ? 0 : 1);
985 sub update_progress_stats
987 $progress_stats_updated = time;
988 return if !$progress_is_dirty;
989 my ($todo, $done, $running) = (scalar @jobstep_todo,
990 scalar @jobstep_done,
991 scalar @slot - scalar @freeslot - scalar @holdslot);
992 $Job->{'tasks_summary'} ||= {};
993 $Job->{'tasks_summary'}->{'todo'} = $todo;
994 $Job->{'tasks_summary'}->{'done'} = $done;
995 $Job->{'tasks_summary'}->{'running'} = $running;
996 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
997 Log (undef, "status: $done done, $running running, $todo todo");
998 $progress_is_dirty = 0;
1005 my $pid = waitpid (-1, WNOHANG);
1006 return 0 if $pid <= 0;
1008 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1010 . $slot[$proc{$pid}->{slot}]->{cpu});
1011 my $jobstepid = $proc{$pid}->{jobstep};
1012 my $elapsed = time - $proc{$pid}->{time};
1013 my $Jobstep = $jobstep[$jobstepid];
1015 my $childstatus = $?;
1016 my $exitvalue = $childstatus >> 8;
1017 my $exitinfo = "exit ".exit_status_s($childstatus);
1018 $Jobstep->{'arvados_task'}->reload;
1019 my $task_success = $Jobstep->{'arvados_task'}->{success};
1021 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1023 if (!defined $task_success) {
1024 # task did not indicate one way or the other --> fail
1025 $Jobstep->{'arvados_task'}->{success} = 0;
1026 $Jobstep->{'arvados_task'}->save;
1033 $temporary_fail ||= $Jobstep->{node_fail};
1034 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1036 ++$thisround_failed;
1037 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1039 # Check for signs of a failed or misconfigured node
1040 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1041 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1042 # Don't count this against jobstep failure thresholds if this
1043 # node is already suspected faulty and srun exited quickly
1044 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1046 Log ($jobstepid, "blaming failure on suspect node " .
1047 $slot[$proc{$pid}->{slot}]->{node}->{name});
1048 $temporary_fail ||= 1;
1050 ban_node_by_slot($proc{$pid}->{slot});
1053 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1054 ++$Jobstep->{'failures'},
1055 $temporary_fail ? 'temporary ' : 'permanent',
1058 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1059 # Give up on this task, and the whole job
1062 # Put this task back on the todo queue
1063 push @jobstep_todo, $jobstepid;
1064 $Job->{'tasks_summary'}->{'failed'}++;
1068 ++$thisround_succeeded;
1069 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1070 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1071 push @jobstep_done, $jobstepid;
1072 Log ($jobstepid, "success in $elapsed seconds");
1074 $Jobstep->{exitcode} = $childstatus;
1075 $Jobstep->{finishtime} = time;
1076 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1077 $Jobstep->{'arvados_task'}->save;
1078 process_stderr ($jobstepid, $task_success);
1079 Log ($jobstepid, sprintf("task output (%d bytes): %s",
1080 length($Jobstep->{'arvados_task'}->{output}),
1081 $Jobstep->{'arvados_task'}->{output}));
1083 close $reader{$jobstepid};
1084 delete $reader{$jobstepid};
1085 delete $slot[$proc{$pid}->{slot}]->{pid};
1086 push @freeslot, $proc{$pid}->{slot};
1089 if ($task_success) {
1091 my $newtask_list = [];
1092 my $newtask_results;
1094 $newtask_results = api_call(
1097 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1099 'order' => 'qsequence',
1100 'offset' => scalar(@$newtask_list),
1102 push(@$newtask_list, @{$newtask_results->{items}});
1103 } while (@{$newtask_results->{items}});
1104 foreach my $arvados_task (@$newtask_list) {
1106 'level' => $arvados_task->{'sequence'},
1108 'arvados_task' => $arvados_task
1110 push @jobstep, $jobstep;
1111 push @jobstep_todo, $#jobstep;
1115 $progress_is_dirty = 1;
1119 sub check_refresh_wanted
1121 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1122 if (@stat && $stat[9] > $latest_refresh) {
1123 $latest_refresh = scalar time;
1124 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1125 for my $attr ('cancelled_at',
1126 'cancelled_by_user_uuid',
1127 'cancelled_by_client_uuid',
1129 $Job->{$attr} = $Job2->{$attr};
1131 if ($Job->{'state'} ne "Running") {
1132 if ($Job->{'state'} eq "Cancelled") {
1133 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1135 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1138 $main::please_freeze = 1;
1145 # return if the kill list was checked <4 seconds ago
1146 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1150 $squeue_kill_checked = time;
1152 # use killem() on procs whose killtime is reached
1155 if (exists $proc{$_}->{killtime}
1156 && $proc{$_}->{killtime} <= time)
1162 # return if the squeue was checked <60 seconds ago
1163 if (defined $squeue_checked && $squeue_checked > time - 60)
1167 $squeue_checked = time;
1171 # here is an opportunity to check for mysterious problems with local procs
1175 # get a list of steps still running
1176 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1178 if ($squeue[-1] ne "ok")
1184 # which of my jobsteps are running, according to squeue?
1188 if (/^(\d+)\.(\d+) (\S+)/)
1190 if ($1 eq $ENV{SLURM_JOBID})
1197 # which of my active child procs (>60s old) were not mentioned by squeue?
1198 foreach (keys %proc)
1200 if ($proc{$_}->{time} < time - 60
1201 && !exists $ok{$proc{$_}->{jobstepname}}
1202 && !exists $proc{$_}->{killtime})
1204 # kill this proc if it hasn't exited in 30 seconds
1205 $proc{$_}->{killtime} = time + 30;
1211 sub release_allocation
1215 Log (undef, "release job allocation");
1216 system "scancel $ENV{SLURM_JOBID}";
1224 foreach my $job (keys %reader)
1227 while (0 < sysread ($reader{$job}, $buf, 8192))
1229 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1230 $jobstep[$job]->{stderr} .= $buf;
1231 preprocess_stderr ($job);
1232 if (length ($jobstep[$job]->{stderr}) > 16384)
1234 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1243 sub preprocess_stderr
1247 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1249 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1250 Log ($job, "stderr $line");
1251 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1253 $main::please_freeze = 1;
1255 elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1256 $jobstep[$job]->{node_fail} = 1;
1257 ban_node_by_slot($jobstep[$job]->{slotindex});
1266 my $task_success = shift;
1267 preprocess_stderr ($job);
1270 Log ($job, "stderr $_");
1271 } split ("\n", $jobstep[$job]->{stderr});
1278 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1279 Log(undef, "fetch_block run error from arv-get $hash: $!");
1282 my $output_block = "";
1285 my $bytes = sysread($keep, $buf, 1024 * 1024);
1286 if (!defined $bytes) {
1287 Log(undef, "fetch_block read error from arv-get: $!");
1288 $output_block = undef;
1290 } elsif ($bytes == 0) {
1291 # sysread returns 0 at the end of the pipe.
1294 # some bytes were read into buf.
1295 $output_block .= $buf;
1300 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1301 $output_block = undef;
1303 return $output_block;
1306 # Create a collection by concatenating the output of all tasks (each
1307 # task's output is either a manifest fragment, a locator for a
1308 # manifest fragment stored in Keep, or nothing at all). Return the
1309 # portable_data_hash of the new collection.
1310 sub create_output_collection
1312 Log (undef, "collate");
1314 my ($child_out, $child_in);
1315 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1318 print (arvados.api("v1").collections().
1319 create(body={"manifest_text": sys.stdin.read()}).
1320 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1324 my $manifest_size = 0;
1328 my $output = $_->{'arvados_task'}->{output};
1329 next if (!defined($output));
1331 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1332 $next_write = fetch_block($output);
1334 $next_write = $output;
1336 if (defined($next_write)) {
1337 if (!defined(syswrite($child_in, $next_write))) {
1338 # There's been an error writing. Stop the loop.
1339 # We'll log details about the exit code later.
1342 $manifest_size += length($next_write);
1345 my $uuid = $_->{'arvados_task'}->{'uuid'};
1346 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1351 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1354 my $s = IO::Select->new($child_out);
1355 if ($s->can_read(120)) {
1356 sysread($child_out, $joboutput, 1024 * 1024);
1359 Log(undef, "output collection creation exited " . exit_status_s($?));
1365 Log (undef, "timed out while creating output collection");
1366 foreach my $signal (2, 2, 2, 15, 15, 9) {
1367 kill($signal, $pid);
1368 last if waitpid($pid, WNOHANG) == -1;
1382 my $sig = 2; # SIGINT first
1383 if (exists $proc{$_}->{"sent_$sig"} &&
1384 time - $proc{$_}->{"sent_$sig"} > 4)
1386 $sig = 15; # SIGTERM if SIGINT doesn't work
1388 if (exists $proc{$_}->{"sent_$sig"} &&
1389 time - $proc{$_}->{"sent_$sig"} > 4)
1391 $sig = 9; # SIGKILL if SIGTERM doesn't work
1393 if (!exists $proc{$_}->{"sent_$sig"})
1395 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1397 select (undef, undef, undef, 0.1);
1400 kill $sig, $_; # srun wants two SIGINT to really interrupt
1402 $proc{$_}->{"sent_$sig"} = time;
1403 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1413 vec($bits,fileno($_),1) = 1;
1419 # Send log output to Keep via arv-put.
1421 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1422 # $log_pipe_pid is the pid of the arv-put subprocess.
1424 # The only functions that should access these variables directly are:
1426 # log_writer_start($logfilename)
1427 # Starts an arv-put pipe, reading data on stdin and writing it to
1428 # a $logfilename file in an output collection.
1430 # log_writer_send($txt)
1431 # Writes $txt to the output log collection.
1433 # log_writer_finish()
1434 # Closes the arv-put pipe and returns the output that it produces.
1436 # log_writer_is_active()
1437 # Returns a true value if there is currently a live arv-put
1438 # process, false otherwise.
1440 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1442 sub log_writer_start($)
1444 my $logfilename = shift;
1445 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1447 '--portable-data-hash',
1448 '--project-uuid', $Job->{owner_uuid},
1450 '--name', $logfilename,
1451 '--filename', $logfilename,
1455 sub log_writer_send($)
1458 print $log_pipe_in $txt;
1461 sub log_writer_finish()
1463 return unless $log_pipe_pid;
1465 close($log_pipe_in);
1468 my $s = IO::Select->new($log_pipe_out);
1469 if ($s->can_read(120)) {
1470 sysread($log_pipe_out, $arv_put_output, 1024);
1471 chomp($arv_put_output);
1473 Log (undef, "timed out reading from 'arv-put'");
1476 waitpid($log_pipe_pid, 0);
1477 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1479 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1482 return $arv_put_output;
1485 sub log_writer_is_active() {
1486 return $log_pipe_pid;
1489 sub Log # ($jobstep_id, $logmessage)
1491 if ($_[1] =~ /\n/) {
1492 for my $line (split (/\n/, $_[1])) {
1497 my $fh = select STDERR; $|=1; select $fh;
1498 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1499 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1502 if (log_writer_is_active() || -t STDERR) {
1503 my @gmtime = gmtime;
1504 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1505 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1507 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1509 if (log_writer_is_active()) {
1510 log_writer_send($datetime . " " . $message);
1517 my ($package, $file, $line) = caller;
1518 my $message = "@_ at $file line $line\n";
1519 Log (undef, $message);
1520 freeze() if @jobstep_todo;
1521 create_output_collection() if @jobstep_todo;
1531 if ($Job->{'state'} eq 'Cancelled') {
1532 $Job->update_attributes('finished_at' => scalar gmtime);
1534 $Job->update_attributes('state' => 'Failed');
1541 my $justcheckpoint = shift; # false if this will be the last meta saved
1542 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1543 return unless log_writer_is_active();
1545 my $loglocator = log_writer_finish();
1546 Log (undef, "log manifest is $loglocator");
1547 $Job->{'log'} = $loglocator;
1548 $Job->update_attributes('log', $loglocator);
1552 sub freeze_if_want_freeze
1554 if ($main::please_freeze)
1556 release_allocation();
1559 # kill some srun procs before freeze+stop
1560 map { $proc{$_} = {} } @_;
1563 killem (keys %proc);
1564 select (undef, undef, undef, 0.1);
1566 while (($died = waitpid (-1, WNOHANG)) > 0)
1568 delete $proc{$died};
1573 create_output_collection();
1583 Log (undef, "Freeze not implemented");
1590 croak ("Thaw not implemented");
1606 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1613 my $srunargs = shift;
1614 my $execargs = shift;
1615 my $opts = shift || {};
1617 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1619 $Data::Dumper::Terse = 1;
1620 $Data::Dumper::Indent = 0;
1621 my $show_cmd = Dumper($args);
1622 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1623 $show_cmd =~ s/\n/ /g;
1624 warn "starting: $show_cmd\n";
1626 if (defined $stdin) {
1627 my $child = open STDIN, "-|";
1628 defined $child or die "no fork: $!";
1630 print $stdin or die $!;
1631 close STDOUT or die $!;
1636 return system (@$args) if $opts->{fork};
1639 warn "ENV size is ".length(join(" ",%ENV));
1640 die "exec failed: $!: @$args";
1644 sub ban_node_by_slot {
1645 # Don't start any new jobsteps on this node for 60 seconds
1647 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1648 $slot[$slotid]->{node}->{hold_count}++;
1649 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1654 my ($lockfile, $error_message) = @_;
1655 open L, ">", $lockfile or croak("$lockfile: $!");
1656 if (!flock L, LOCK_EX|LOCK_NB) {
1657 croak("Can't lock $lockfile: $error_message\n");
1661 sub find_docker_image {
1662 # Given a Keep locator, check to see if it contains a Docker image.
1663 # If so, return its stream name and Docker hash.
1664 # If not, return undef for both values.
1665 my $locator = shift;
1666 my ($streamname, $filename);
1667 my $image = api_call("collections/get", uuid => $locator);
1669 foreach my $line (split(/\n/, $image->{manifest_text})) {
1670 my @tokens = split(/\s+/, $line);
1672 $streamname = shift(@tokens);
1673 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1674 if (defined($filename)) {
1675 return (undef, undef); # More than one file in the Collection.
1677 $filename = (split(/:/, $filedata, 3))[2];
1682 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1683 return ($streamname, $1);
1685 return (undef, undef);
1690 # Calculate the number of times an operation should be retried,
1691 # assuming exponential backoff, and that we're willing to retry as
1692 # long as tasks have been running. Enforce a minimum of 3 retries.
1693 my ($starttime, $endtime, $timediff, $retries);
1695 $starttime = $jobstep[0]->{starttime};
1696 $endtime = $jobstep[-1]->{finishtime};
1698 if (!defined($starttime)) {
1700 } elsif (!defined($endtime)) {
1701 $timediff = time - $starttime;
1703 $timediff = ($endtime - $starttime) - (time - $endtime);
1705 if ($timediff > 0) {
1706 $retries = int(log($timediff) / log(2));
1708 $retries = 1; # Use the minimum.
1710 return ($retries > 3) ? $retries : 3;
1714 # Pass in two function references.
1715 # This method will be called with the remaining arguments.
1716 # If it dies, retry it with exponential backoff until it succeeds,
1717 # or until the current retry_count is exhausted. After each failure
1718 # that can be retried, the second function will be called with
1719 # the current try count (0-based), next try time, and error message.
1720 my $operation = shift;
1721 my $retry_callback = shift;
1722 my $retries = retry_count();
1723 foreach my $try_count (0..$retries) {
1724 my $next_try = time + (2 ** $try_count);
1725 my $result = eval { $operation->(@_); };
1728 } elsif ($try_count < $retries) {
1729 $retry_callback->($try_count, $next_try, $@);
1730 my $sleep_time = $next_try - time;
1731 sleep($sleep_time) if ($sleep_time > 0);
1734 # Ensure the error message ends in a newline, so Perl doesn't add
1735 # retry_op's line number to it.
1741 # Pass in a /-separated API method name, and arguments for it.
1742 # This function will call that method, retrying as needed until
1743 # the current retry_count is exhausted, with a log on the first failure.
1744 my $method_name = shift;
1745 my $log_api_retry = sub {
1746 my ($try_count, $next_try_at, $errmsg) = @_;
1747 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1748 $errmsg =~ s/\s/ /g;
1749 $errmsg =~ s/\s+$//;
1751 if ($next_try_at < time) {
1752 $retry_msg = "Retrying.";
1754 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1755 $retry_msg = "Retrying at $next_try_fmt.";
1757 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1760 foreach my $key (split(/\//, $method_name)) {
1761 $method = $method->{$key};
1763 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1767 # Given a $?, return a human-readable exit code string like "0" or
1768 # "1" or "0 with signal 1" or "1 with signal 11".
1769 my $exitcode = shift;
1770 my $s = $exitcode >> 8;
1771 if ($exitcode & 0x7f) {
1772 $s .= " with signal " . ($exitcode & 0x7f);
1774 if ($exitcode & 0x80) {
1775 $s .= " with core dump";
1780 sub handle_readall {
1781 # Pass in a glob reference to a file handle.
1782 # Read all its contents and return them as a string.
1783 my $fh_glob_ref = shift;
1785 return <$fh_glob_ref>;
1788 sub tar_filename_n {
1790 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1793 sub add_git_archive {
1794 # Pass in a git archive command as a string or list, a la system().
1795 # This method will save its output to be included in the archive sent to the
1799 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1800 croak("Failed to save git archive: $!");
1802 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1804 waitpid($git_pid, 0);
1807 croak("Failed to save git archive: git exited " . exit_status_s($?));
1811 sub combined_git_archive {
1812 # Combine all saved tar archives into a single archive, then return its
1813 # contents in a string. Return undef if no archives have been saved.
1814 if ($git_tar_count < 1) {
1817 my $base_tar_name = tar_filename_n(1);
1818 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1819 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1820 if ($tar_exit != 0) {
1821 croak("Error preparing build archive: tar -A exited " .
1822 exit_status_s($tar_exit));
1825 if (!open(GIT_TAR, "<", $base_tar_name)) {
1826 croak("Could not open build archive: $!");
1828 my $tar_contents = handle_readall(\*GIT_TAR);
1830 return $tar_contents;
1836 # This is crunch-job's internal dispatch script. crunch-job running on the API
1837 # server invokes this script on individual compute nodes, or localhost if we're
1838 # running a job locally. It gets called in two modes:
1840 # * No arguments: Installation mode. Read a tar archive from the DATA
1841 # file handle; it includes the Crunch script's source code, and
1842 # maybe SDKs as well. Those should be installed in the proper
1843 # locations. This runs outside of any Docker container, so don't try to
1844 # introspect Crunch's runtime environment.
1846 # * With arguments: Crunch script run mode. This script should set up the
1847 # environment, then run the command specified in the arguments. This runs
1848 # inside any Docker container.
1851 use File::Path qw( make_path remove_tree );
1852 use POSIX qw(getcwd);
1854 use constant TASK_TEMPFAIL => 111;
1856 # Map SDK subdirectories to the path environments they belong to.
1857 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1859 my $destdir = $ENV{"CRUNCH_SRC"};
1860 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1861 my $repo = $ENV{"CRUNCH_SRC_URL"};
1862 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1863 my $job_work = $ENV{"JOB_WORK"};
1864 my $task_work = $ENV{"TASK_WORK"};
1866 for my $dir ($destdir, $job_work, $task_work) {
1869 -e $dir or die "Failed to create temporary directory ($dir): $!";
1874 remove_tree($task_work, {keep_root => 1});
1877 open(STDOUT_ORIG, ">&", STDOUT);
1878 open(STDERR_ORIG, ">&", STDERR);
1879 open(STDOUT, ">>", "$destdir.log");
1880 open(STDERR, ">&", STDOUT);
1882 ### Crunch script run mode
1884 # We want to do routine logging during task 0 only. This gives the user
1885 # the information they need, but avoids repeating the information for every
1888 if ($ENV{TASK_SEQUENCE} eq "0") {
1891 printf STDERR_ORIG "[Crunch] $msg\n", @_;
1897 my $python_src = "$install_dir/python";
1898 my $venv_dir = "$job_work/.arvados.venv";
1899 my $venv_built = -e "$venv_dir/bin/activate";
1900 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1901 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1902 "--python=python2.7", $venv_dir);
1903 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1905 $Log->("Built Python SDK virtualenv");
1908 my $pip_bin = "pip";
1910 $Log->("Running in Python SDK virtualenv");
1911 $pip_bin = "$venv_dir/bin/pip";
1912 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1913 @ARGV = ("/bin/sh", "-ec",
1914 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1915 } elsif (-d $python_src) {
1916 $Log->("Warning: virtualenv not found inside Docker container default " .
1917 "\$PATH. Can't install Python SDK.");
1920 my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1922 $Log->("Using Arvados SDK:");
1923 foreach my $line (split /\n/, $pkgs) {
1927 $Log->("Arvados SDK packages not found");
1930 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1931 my $sdk_path = "$install_dir/$sdk_dir";
1933 if ($ENV{$sdk_envkey}) {
1934 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1936 $ENV{$sdk_envkey} = $sdk_path;
1938 $Log->("Arvados SDK added to %s", $sdk_envkey);
1944 open(STDOUT, ">&", STDOUT_ORIG);
1945 open(STDERR, ">&", STDERR_ORIG);
1947 die "Cannot exec `@ARGV`: $!";
1950 ### Installation mode
1951 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1953 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1954 # This version already installed -> nothing to do.
1958 unlink "$destdir.commit";
1961 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1962 die "Error launching 'tar -xC $destdir': $!";
1964 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1965 # get SIGPIPE. We must feed it data incrementally.
1967 while (read(DATA, $tar_input, 65536)) {
1968 print TARX $tar_input;
1971 die "'tar -xC $destdir' exited $?: $!";
1976 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1978 foreach my $sdk_lang (("python",
1979 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1980 if (-d "$sdk_root/$sdk_lang") {
1981 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1982 die "Failed to install $sdk_lang SDK: $!";
1988 my $python_dir = "$install_dir/python";
1989 if ((-d $python_dir) and can_run("python2.7") and
1990 (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1991 # egg_info failed, probably when it asked git for a build tag.
1992 # Specify no build tag.
1993 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1994 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1998 if (-e "$destdir/crunch_scripts/install") {
1999 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2000 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2002 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2003 } elsif (-e "./install.sh") {
2004 shell_or_die (undef, "./install.sh", $install_dir);
2008 unlink "$destdir.commit.new";
2009 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
2010 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
2016 my $command_name = shift;
2017 open(my $which, "-|", "which", $command_name);
2018 while (<$which>) { }
2025 my $exitcode = shift;
2027 if ($ENV{"DEBUG"}) {
2028 print STDERR "@_\n";
2030 if (system (@_) != 0) {
2033 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2034 open STDERR, ">&STDERR_ORIG";
2035 system ("cat $destdir.log >&2");
2036 warn "@_ failed ($err): $exitstatus";
2037 if (defined($exitcode)) {
2041 exit (($code >> 8) || 1);