2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
10 crunch-job: Execute job steps, save snapshots as requested, collate output.
14 Obtain job details from Arvados, run tasks on compute nodes (typically
15 invoked by scheduler on controller):
17 crunch-job --job x-y-z --git-dir /path/to/repo/.git
19 Obtain job details from command line, run tasks on local machine
20 (typically invoked by application or developer on VM):
22 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
24 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
32 If the job is already locked, steal the lock and run it anyway.
36 Path to a .git directory (or a git URL) where the commit given in the
37 job's C<script_version> attribute is to be found. If this is I<not>
38 given, the job's C<repository> attribute will be used.
42 Arvados API authorization token to use during the course of the job.
46 Do not clear per-job/task temporary directories during initial job
47 setup. This can speed up development and debugging when running jobs
52 UUID of the job to run, or a JSON-encoded job resource without a
53 UUID. If the latter is given, a new job object will be created.
57 =head1 RUNNING JOBS LOCALLY
59 crunch-job's log messages appear on stderr along with the job tasks'
60 stderr streams. The log is saved in Keep at each checkpoint and when
63 If the job succeeds, the job's output locator is printed on stdout.
65 While the job is running, the following signals are accepted:
69 =item control-C, SIGINT, SIGQUIT
71 Save a checkpoint, terminate any job tasks that are running, and stop.
75 Save a checkpoint and continue.
79 Refresh node allocation (i.e., check whether any nodes have been added
80 or unallocated) and attributes of the Job record that should affect
81 behavior (e.g., cancel job if cancelled_at becomes non-nil).
89 use POSIX ':sys_wait_h';
90 use POSIX qw(strftime);
91 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
95 use Digest::MD5 qw(md5_hex);
101 use File::Path qw( make_path remove_tree );
103 use constant TASK_TEMPFAIL => 111;
104 use constant EX_TEMPFAIL => 75;
105 use constant EX_RETRY_UNLOCKED => 93;
107 $ENV{"TMPDIR"} ||= "/tmp";
108 unless (defined $ENV{"CRUNCH_TMP"}) {
109 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
110 if ($ENV{"USER"} ne "crunch" && $< != 0) {
111 # use a tmp dir unique for my uid
112 $ENV{"CRUNCH_TMP"} .= "-$<";
116 # Create the tmp directory if it does not exist
117 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
118 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
121 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
122 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
123 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
124 mkdir ($ENV{"JOB_WORK"});
133 my $cgroup_root = "/sys/fs/cgroup";
134 my $docker_bin = "docker.io";
135 my $docker_run_args = "";
136 GetOptions('force-unlock' => \$force_unlock,
137 'git-dir=s' => \$git_dir,
138 'job=s' => \$jobspec,
139 'job-api-token=s' => \$job_api_token,
140 'no-clear-tmp' => \$no_clear_tmp,
141 'resume-stash=s' => \$resume_stash,
142 'cgroup-root=s' => \$cgroup_root,
143 'docker-bin=s' => \$docker_bin,
144 'docker-run-args=s' => \$docker_run_args,
147 if (defined $job_api_token) {
148 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
151 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
156 $main::ENV{CRUNCH_DEBUG} = 1;
160 $main::ENV{CRUNCH_DEBUG} = 0;
163 my $arv = Arvados->new('apiVersion' => 'v1');
172 if ($jobspec =~ /^[-a-z\d]+$/)
174 # $jobspec is an Arvados UUID, not a JSON job specification
175 $Job = api_call("jobs/get", uuid => $jobspec);
180 $local_job = JSON::decode_json($jobspec);
184 # Make sure our workers (our slurm nodes, localhost, or whatever) are
185 # at least able to run basic commands: they aren't down or severely
188 if (($Job || $local_job)->{docker_image_locator}) {
189 $cmd = [$docker_bin, 'ps', '-q'];
191 Log(undef, "Sanity check is `@$cmd`");
192 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
193 ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
195 {label => "sanity check"});
197 Log(undef, "Sanity check failed: ".exit_status_s($exited));
200 Log(undef, "Sanity check OK");
203 my $User = api_call("users/current");
206 if (!$force_unlock) {
207 # Claim this job, and make sure nobody else does
208 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
210 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
219 map { croak ("No $_ specified") unless $local_job->{$_} }
220 qw(script script_version script_parameters);
223 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
224 $local_job->{'started_at'} = gmtime;
225 $local_job->{'state'} = 'Running';
227 $Job = api_call("jobs/create", job => $local_job);
229 $job_id = $Job->{'uuid'};
231 my $keep_logfile = $job_id . '.log.txt';
232 log_writer_start($keep_logfile);
234 $Job->{'runtime_constraints'} ||= {};
235 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
236 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
238 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
240 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
241 chomp($gem_versions);
242 chop($gem_versions); # Closing parentheses
247 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
249 Log (undef, "check slurm allocation");
252 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
256 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
257 push @sinfo, "$localcpus localhost";
259 if (exists $ENV{SLURM_NODELIST})
261 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
265 my ($ncpus, $slurm_nodelist) = split;
266 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
269 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
272 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
275 foreach (split (",", $ranges))
288 push @nodelist, map {
290 $n =~ s/\[[-,\d]+\]/$_/;
297 push @nodelist, $nodelist;
300 foreach my $nodename (@nodelist)
302 Log (undef, "node $nodename - $ncpus slots");
303 my $node = { name => $nodename,
305 # The number of consecutive times a task has been dispatched
306 # to this node and failed.
308 # The number of consecutive times that SLURM has reported
309 # a node failure since the last successful task.
311 # Don't dispatch work to this node until this time
312 # (in seconds since the epoch) has passed.
314 foreach my $cpu (1..$ncpus)
316 push @slot, { node => $node,
320 push @node, @nodelist;
325 # Ensure that we get one jobstep running on each allocated node before
326 # we start overloading nodes with concurrent steps
328 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
331 $Job->update_attributes(
332 'tasks_summary' => { 'failed' => 0,
337 Log (undef, "start");
338 $SIG{'INT'} = sub { $main::please_freeze = 1; };
339 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
340 $SIG{'TERM'} = \&croak;
341 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
342 $SIG{'ALRM'} = sub { $main::please_info = 1; };
343 $SIG{'CONT'} = sub { $main::please_continue = 1; };
344 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
346 $main::please_freeze = 0;
347 $main::please_info = 0;
348 $main::please_continue = 0;
349 $main::please_refresh = 0;
350 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
352 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
353 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
354 $ENV{"JOB_UUID"} = $job_id;
357 my @jobstep_todo = ();
358 my @jobstep_done = ();
359 my @jobstep_tomerge = ();
360 my $jobstep_tomerge_level = 0;
361 my $squeue_checked = 0;
362 my $sinfo_checked = 0;
363 my $latest_refresh = scalar time;
367 if (defined $Job->{thawedfromkey})
369 thaw ($Job->{thawedfromkey});
373 my $first_task = api_call("job_tasks/create", job_task => {
374 'job_uuid' => $Job->{'uuid'},
379 push @jobstep, { 'level' => 0,
381 'arvados_task' => $first_task,
383 push @jobstep_todo, 0;
389 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
392 my $build_script = handle_readall(\*DATA);
393 my $nodelist = join(",", @node);
394 my $git_tar_count = 0;
396 if (!defined $no_clear_tmp) {
397 # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
398 # up work directories crunch_tmp/work, crunch_tmp/opt,
400 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
401 ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
403 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
404 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
406 {label => "clean work dirs"});
408 exit_retry_unlocked();
412 # If this job requires a Docker image, install that.
413 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
414 if ($docker_locator = $Job->{docker_image_locator}) {
415 Log (undef, "Install docker image $docker_locator");
416 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419 croak("No Docker image hash found from locator $docker_locator");
421 Log (undef, "docker image hash is $docker_hash");
422 $docker_stream =~ s/^\.//;
423 my $docker_install_script = qq{
425 id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
426 echo "image ID is \$id"
427 [[ \${id} = \Q$docker_hash\E ]]
429 if loaded >&2 2>/dev/null; then
430 echo >&2 "image is already present"
433 echo >&2 "docker image is not present; loading"
434 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
435 if ! loaded >&2; then
436 echo >&2 "`docker load` exited 0, but image is not found (!)"
439 echo >&2 "image loaded successfully"
442 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
443 ["srun", "--nodelist=" . join(',', @node)],
444 ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
445 {label => "load docker image"});
448 exit_retry_unlocked();
451 # Determine whether this version of Docker supports memory+swap limits.
452 ($exited, $stdout, $stderr, $tempfail) = srun_sync(
453 ["srun", "--nodes=1"],
454 [$docker_bin, 'run', '--help'],
455 {label => "check --memory-swap feature"});
457 exit_retry_unlocked();
459 $docker_limitmem = ($stdout =~ /--memory-swap/);
461 # Find a non-root Docker user to use.
462 # Tries the default user for the container, then 'crunch', then 'nobody',
463 # testing for whether the actual user id is non-zero. This defends against
464 # mistakes but not malice, but we intend to harden the security in the future
465 # so we don't want anyone getting used to their jobs running as root in their
467 my @tryusers = ("", "crunch", "nobody");
468 foreach my $try_user (@tryusers) {
471 if ($try_user eq "") {
472 $label = "check whether default user is UID 0";
475 $label = "check whether user '$try_user' is UID 0";
476 $try_user_arg = "--user=$try_user";
478 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
479 ["srun", "--nodes=1"],
481 "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
484 if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
485 $dockeruserarg = $try_user_arg;
486 if ($try_user eq "") {
487 Log(undef, "Container will run with default user");
489 Log(undef, "Container will run with $dockeruserarg");
492 } elsif ($tempfail) {
493 exit_retry_unlocked();
497 if (!defined $dockeruserarg) {
498 croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
501 if ($Job->{arvados_sdk_version}) {
502 # The job also specifies an Arvados SDK version. Add the SDKs to the
503 # tar file for the build script to install.
504 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
505 $Job->{arvados_sdk_version}));
506 add_git_archive("git", "--git-dir=$git_dir", "archive",
507 "--prefix=.arvados.sdk/",
508 $Job->{arvados_sdk_version}, "sdk");
512 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
513 # If script_version looks like an absolute path, *and* the --git-dir
514 # argument was not given -- which implies we were not invoked by
515 # crunch-dispatch -- we will use the given path as a working
516 # directory instead of resolving script_version to a git commit (or
517 # doing anything else with git).
518 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
519 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
522 # Resolve the given script_version to a git commit sha1. Also, if
523 # the repository is remote, clone it into our local filesystem: this
524 # ensures "git archive" will work, and is necessary to reliably
525 # resolve a symbolic script_version like "master^".
526 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
528 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
530 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
532 # If we're running under crunch-dispatch, it will have already
533 # pulled the appropriate source tree into its own repository, and
534 # given us that repo's path as $git_dir.
536 # If we're running a "local" job, we might have to fetch content
537 # from a remote repository.
539 # (Currently crunch-dispatch gives a local path with --git-dir, but
540 # we might as well accept URLs there too in case it changes its
542 my $repo = $git_dir || $Job->{'repository'};
544 # Repository can be remote or local. If remote, we'll need to fetch it
545 # to a local dir before doing `git log` et al.
548 if ($repo =~ m{://|^[^/]*:}) {
549 # $repo is a git url we can clone, like git:// or https:// or
550 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
551 # not recognized here because distinguishing that from a local
552 # path is too fragile. If you really need something strange here,
553 # use the ssh:// form.
554 $repo_location = 'remote';
555 } elsif ($repo =~ m{^\.*/}) {
556 # $repo is a local path to a git index. We'll also resolve ../foo
557 # to ../foo/.git if the latter is a directory. To help
558 # disambiguate local paths from named hosted repositories, this
559 # form must be given as ./ or ../ if it's a relative path.
560 if (-d "$repo/.git") {
561 $repo = "$repo/.git";
563 $repo_location = 'local';
565 # $repo is none of the above. It must be the name of a hosted
567 my $arv_repo_list = api_call("repositories/list",
568 'filters' => [['name','=',$repo]]);
569 my @repos_found = @{$arv_repo_list->{'items'}};
570 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
572 Log(undef, "Repository '$repo' -> "
573 . join(", ", map { $_->{'uuid'} } @repos_found));
576 croak("Error: Found $n_found repositories with name '$repo'.");
578 $repo = $repos_found[0]->{'fetch_url'};
579 $repo_location = 'remote';
581 Log(undef, "Using $repo_location repository '$repo'");
582 $ENV{"CRUNCH_SRC_URL"} = $repo;
584 # Resolve given script_version (we'll call that $treeish here) to a
585 # commit sha1 ($commit).
586 my $treeish = $Job->{'script_version'};
588 if ($repo_location eq 'remote') {
589 # We minimize excess object-fetching by re-using the same bare
590 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
591 # just keep adding remotes to it as needed.
592 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
593 my $gitcmd = "git --git-dir=\Q$local_repo\E";
595 # Set up our local repo for caching remote objects, making
597 if (!-d $local_repo) {
598 make_path($local_repo) or croak("Error: could not create $local_repo");
600 # This works (exits 0 and doesn't delete fetched objects) even
601 # if $local_repo is already initialized:
602 `$gitcmd init --bare`;
604 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
607 # If $treeish looks like a hash (or abbrev hash) we look it up in
608 # our local cache first, since that's cheaper. (We don't want to
609 # do that with tags/branches though -- those change over time, so
610 # they should always be resolved by the remote repo.)
611 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
612 # Hide stderr because it's normal for this to fail:
613 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
615 # Careful not to resolve a branch named abcdeff to commit 1234567:
616 $sha1 =~ /^$treeish/ &&
617 $sha1 =~ /^([0-9a-f]{40})$/s) {
619 Log(undef, "Commit $commit already present in $local_repo");
623 if (!defined $commit) {
624 # If $treeish isn't just a hash or abbrev hash, or isn't here
625 # yet, we need to fetch the remote to resolve it correctly.
627 # First, remove all local heads. This prevents a name that does
628 # not exist on the remote from resolving to (or colliding with)
629 # a previously fetched branch or tag (possibly from a different
631 remove_tree("$local_repo/refs/heads", {keep_root => 1});
633 Log(undef, "Fetching objects from $repo to $local_repo");
634 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
636 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
640 # Now that the data is all here, we will use our local repo for
641 # the rest of our git activities.
645 my $gitcmd = "git --git-dir=\Q$repo\E";
646 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
647 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
648 croak("`$gitcmd rev-list` exited "
650 .", '$treeish' not found, giving up");
653 Log(undef, "Version $treeish is commit $commit");
655 if ($commit ne $Job->{'script_version'}) {
656 # Record the real commit id in the database, frozentokey, logs,
657 # etc. -- instead of an abbreviation or a branch name which can
658 # become ambiguous or point to a different commit in the future.
659 if (!$Job->update_attributes('script_version' => $commit)) {
660 croak("Error: failed to update job's script_version attribute");
664 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
665 add_git_archive("$gitcmd archive ''\Q$commit\E");
668 my $git_archive = combined_git_archive();
669 if (!defined $git_archive) {
670 Log(undef, "Skip install phase (no git archive)");
672 Log(undef, "Warning: This probably means workers have no source tree!");
677 my $install_script_tries_left = 3;
678 for (my $attempts = 0; $attempts < 3; $attempts++) {
679 my @srunargs = ("srun",
680 "--nodelist=$nodelist",
681 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
682 my @execargs = ("sh", "-c",
683 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
685 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
686 my ($stdout, $stderr, $tempfail);
687 ($exited, $stdout, $stderr, $tempfail) = srun_sync(
688 \@srunargs, \@execargs,
689 {label => "run install script on all workers"},
690 $build_script . $git_archive);
692 exit_retry_unlocked();
695 my $stderr_anything_from_script = 0;
696 for my $line (split(/\n/, $stderr)) {
697 if ($line !~ /^(srun: error: |starting: \[)/) {
698 $stderr_anything_from_script = 1;
702 last if $exited == 0 || $main::please_freeze;
704 # If the install script fails but doesn't print an error message,
705 # the next thing anyone is likely to do is just run it again in
706 # case it was a transient problem like "slurm communication fails
707 # because the network isn't reliable enough". So we'll just do
708 # that ourselves (up to 3 attempts in total). OTOH, if there is an
709 # error message, the problem is more likely to have a real fix and
710 # we should fail the job so the fixing process can start, instead
711 # of doing 2 more attempts.
712 last if $stderr_anything_from_script;
715 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
716 unlink($tar_filename);
724 foreach (qw (script script_version script_parameters runtime_constraints))
728 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
730 foreach (split (/\n/, $Job->{knobs}))
732 Log (undef, "knob " . $_);
736 'filters' => [['hostname', 'in', \@node]],
737 'order' => 'hostname',
738 'limit' => scalar(@node),
740 for my $n (@{$resp->{items}}) {
741 Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
746 $main::success = undef;
752 my $thisround_succeeded = 0;
753 my $thisround_failed = 0;
754 my $thisround_failed_multiple = 0;
755 my $working_slot_count = scalar(@slot);
757 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
758 or $a <=> $b } @jobstep_todo;
759 my $level = $jobstep[$jobstep_todo[0]]->{level};
761 my $initial_tasks_this_level = 0;
762 foreach my $id (@jobstep_todo) {
763 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
766 # If the number of tasks scheduled at this level #T is smaller than the number
767 # of slots available #S, only use the first #T slots, or the first slot on
768 # each node, whichever number is greater.
770 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
771 # based on these numbers. Using fewer slots makes more resources available
772 # to each individual task, which should normally be a better strategy when
773 # there are fewer of them running with less parallelism.
775 # Note that this calculation is not redone if the initial tasks at
776 # this level queue more tasks at the same level. This may harm
777 # overall task throughput for that level.
779 if ($initial_tasks_this_level < @node) {
780 @freeslot = (0..$#node);
781 } elsif ($initial_tasks_this_level < @slot) {
782 @freeslot = (0..$initial_tasks_this_level - 1);
784 @freeslot = (0..$#slot);
786 my $round_num_freeslots = scalar(@freeslot);
787 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
789 my %round_max_slots = ();
790 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
791 my $this_slot = $slot[$freeslot[$ii]];
792 my $node_name = $this_slot->{node}->{name};
793 $round_max_slots{$node_name} ||= $this_slot->{cpu};
794 last if (scalar(keys(%round_max_slots)) >= @node);
797 Log(undef, "start level $level with $round_num_freeslots slots");
800 my $progress_is_dirty = 1;
801 my $progress_stats_updated = 0;
803 update_progress_stats();
807 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
809 # Don't create new tasks if we already know the job's final result.
810 last if defined($main::success);
812 my $id = $jobstep_todo[$todo_ptr];
813 my $Jobstep = $jobstep[$id];
814 if ($Jobstep->{level} != $level)
819 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
820 set_nonblocking($reader{$id});
822 my $childslot = $freeslot[0];
823 my $childnode = $slot[$childslot]->{node};
824 my $childslotname = join (".",
825 $slot[$childslot]->{node}->{name},
826 $slot[$childslot]->{cpu});
828 my $childpid = fork();
831 $SIG{'INT'} = 'DEFAULT';
832 $SIG{'QUIT'} = 'DEFAULT';
833 $SIG{'TERM'} = 'DEFAULT';
835 foreach (values (%reader))
839 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
840 open(STDOUT,">&writer") or croak ($!);
841 open(STDERR,">&writer") or croak ($!);
846 delete $ENV{"GNUPGHOME"};
847 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
848 $ENV{"TASK_QSEQUENCE"} = $id;
849 $ENV{"TASK_SEQUENCE"} = $level;
850 $ENV{"JOB_SCRIPT"} = $Job->{script};
851 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
852 $param =~ tr/a-z/A-Z/;
853 $ENV{"JOB_PARAMETER_$param"} = $value;
855 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
856 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
857 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
858 $ENV{"HOME"} = $ENV{"TASK_WORK"};
859 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
860 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
861 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
863 my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
869 "--nodelist=".$childnode->{name},
870 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
871 "--job-name=$job_id.$id.$$",
874 my $stdbuf = " stdbuf --output=0 --error=0 ";
876 my $arv_file_cache = "";
877 if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
878 $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
882 "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
883 ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
884 ."&& cd \Q$ENV{CRUNCH_TMP}\E "
885 # These environment variables get used explicitly later in
886 # $command. No tool is expected to read these values directly.
887 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
888 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
889 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
890 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
891 .q{&& declare -a VOLUMES=() }
892 .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
893 .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
894 .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
896 $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
897 $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
898 $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
902 my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
903 my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
904 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
905 $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
906 # We only set memory limits if Docker lets us limit both memory and swap.
907 # Memory limits alone have been supported longer, but subprocesses tend
908 # to get SIGKILL if they exceed that without any swap limit set.
909 # See #5642 for additional background.
910 if ($docker_limitmem) {
911 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
914 # The source tree and $destdir directory (which we have
915 # installed on the worker host) are available in the container,
916 # under the same path.
917 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
918 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
920 # Currently, we make the "by_pdh" directory in arv-mount's mount
921 # point appear at /keep inside the container (instead of using
922 # the same path as the host like we do with CRUNCH_SRC and
923 # CRUNCH_INSTALL). However, crunch scripts and utilities must
924 # not rely on this. They must use $TASK_KEEPMOUNT.
925 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
926 $ENV{TASK_KEEPMOUNT} = "/keep";
928 # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
929 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
930 $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
932 # TASK_WORK is almost exactly like a docker data volume: it
933 # starts out empty, is writable, and persists until no
934 # containers use it any more. We don't use --volumes-from to
935 # share it with other containers: it is only accessible to this
936 # task, and it goes away when this task stops.
938 # However, a docker data volume is writable only by root unless
939 # the mount point already happens to exist in the container with
940 # different permissions. Therefore, we [1] assume /tmp already
941 # exists in the image and is writable by the crunch user; [2]
942 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
943 # writable if they are created by docker while setting up the
944 # other --volumes); and [3] create $TASK_WORK inside the
945 # container using $build_script.
946 $command .= "--volume=/tmp ";
947 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
948 $ENV{"HOME"} = $ENV{"TASK_WORK"};
949 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
951 # TODO: Share a single JOB_WORK volume across all task
952 # containers on a given worker node, and delete it when the job
953 # ends (and, in case that doesn't work, when the next job
956 # For now, use the same approach as TASK_WORK above.
957 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
959 # Bind mount the crunchrunner binary and host TLS certificates file into
961 $command .= '"${VOLUMES[@]}" ';
963 while (my ($env_key, $env_val) = each %ENV)
965 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
966 $command .= "--env=\Q$env_key=$env_val\E ";
969 $command .= "--env=\QHOME=$ENV{HOME}\E ";
970 $command .= "\Q$docker_hash\E ";
972 if ($Job->{arvados_sdk_version}) {
974 $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
976 $command .= "/bin/sh -c \'python -c " .
977 '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
978 ">&2 2>/dev/null; " .
979 "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
980 "if which stdbuf >/dev/null ; then " .
981 " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
983 " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
988 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
990 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
993 my @execargs = ('bash', '-c', $command);
994 srun (\@srunargs, \@execargs, undef, $build_script);
995 # exec() failed, we assume nothing happened.
996 die "srun() failed on build script\n";
999 if (!defined $childpid)
1002 delete $reader{$id};
1006 $proc{$childpid} = {
1010 jobstepname => "$job_id.$id.$childpid",
1012 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1013 $slot[$childslot]->{pid} = $childpid;
1015 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1016 Log ($id, "child $childpid started on $childslotname");
1017 $Jobstep->{starttime} = time;
1018 $Jobstep->{node} = $childnode->{name};
1019 $Jobstep->{slotindex} = $childslot;
1020 delete $Jobstep->{stderr};
1021 delete $Jobstep->{finishtime};
1022 delete $Jobstep->{tempfail};
1024 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1025 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1027 splice @jobstep_todo, $todo_ptr, 1;
1030 $progress_is_dirty = 1;
1034 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1036 last THISROUND if $main::please_freeze;
1037 if ($main::please_info)
1039 $main::please_info = 0;
1041 create_output_collection();
1043 update_progress_stats();
1048 if (!$gotsome || ($latest_refresh + 2 < scalar time))
1050 check_refresh_wanted();
1052 update_progress_stats();
1054 elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1056 update_progress_stats();
1059 select (undef, undef, undef, 0.1);
1061 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1062 $_->{node}->{hold_count} < 4 } @slot);
1063 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1064 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1066 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1067 .($thisround_failed+$thisround_succeeded)
1068 .") -- giving up on this round";
1069 Log (undef, $message);
1073 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1074 for (my $i=$#freeslot; $i>=0; $i--) {
1075 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1076 push @holdslot, (splice @freeslot, $i, 1);
1079 for (my $i=$#holdslot; $i>=0; $i--) {
1080 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1081 push @freeslot, (splice @holdslot, $i, 1);
1085 # give up if no nodes are succeeding
1086 if ($working_slot_count < 1) {
1087 Log(undef, "Every node has failed -- giving up");
1094 push @freeslot, splice @holdslot;
1095 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1098 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1101 if ($main::please_continue) {
1102 $main::please_continue = 0;
1105 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1107 if (!reapchildren())
1109 check_refresh_wanted();
1111 update_progress_stats();
1112 select (undef, undef, undef, 0.1);
1113 killem (keys %proc) if $main::please_freeze;
1117 update_progress_stats();
1118 freeze_if_want_freeze();
1121 if (!defined $main::success)
1123 if (!@jobstep_todo) {
1125 } elsif ($working_slot_count < 1) {
1126 save_output_collection();
1128 exit_retry_unlocked();
1129 } elsif ($thisround_succeeded == 0 &&
1130 ($thisround_failed == 0 || $thisround_failed > 4)) {
1131 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1132 Log (undef, $message);
1137 goto ONELEVEL if !defined $main::success;
1140 release_allocation();
1142 my $collated_output = save_output_collection();
1143 Log (undef, "finish");
1145 my $final_log = save_meta();
1148 if ($collated_output && $final_log && $main::success) {
1149 $final_state = 'Complete';
1151 $final_state = 'Failed';
1153 $Job->update_attributes('state' => $final_state);
1155 exit (($final_state eq 'Complete') ? 0 : 1);
1159 sub update_progress_stats
1161 $progress_stats_updated = time;
1162 return if !$progress_is_dirty;
1163 my ($todo, $done, $running) = (scalar @jobstep_todo,
1164 scalar @jobstep_done,
1165 scalar keys(%proc));
1166 $Job->{'tasks_summary'} ||= {};
1167 $Job->{'tasks_summary'}->{'todo'} = $todo;
1168 $Job->{'tasks_summary'}->{'done'} = $done;
1169 $Job->{'tasks_summary'}->{'running'} = $running;
1170 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1171 Log (undef, "status: $done done, $running running, $todo todo");
1172 $progress_is_dirty = 0;
1179 my $children_reaped = 0;
1180 my @successful_task_uuids = ();
1182 while((my $pid = waitpid (-1, WNOHANG)) > 0)
1184 my $childstatus = $?;
1186 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1188 . $slot[$proc{$pid}->{slot}]->{cpu});
1189 my $jobstepidx = $proc{$pid}->{jobstepidx};
1192 my $elapsed = time - $proc{$pid}->{time};
1193 my $Jobstep = $jobstep[$jobstepidx];
1195 my $exitvalue = $childstatus >> 8;
1196 my $exitinfo = "exit ".exit_status_s($childstatus);
1197 $Jobstep->{'arvados_task'}->reload;
1198 my $task_success = $Jobstep->{'arvados_task'}->{success};
1200 Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1202 if (!defined $task_success) {
1203 # task did not indicate one way or the other --> fail
1204 Log($jobstepidx, sprintf(
1205 "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1206 exit_status_s($childstatus)));
1207 $Jobstep->{'arvados_task'}->{success} = 0;
1208 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1215 $temporary_fail ||= $Jobstep->{tempfail};
1216 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1218 ++$thisround_failed;
1219 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1221 # Check for signs of a failed or misconfigured node
1222 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1223 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1224 # Don't count this against jobstep failure thresholds if this
1225 # node is already suspected faulty and srun exited quickly
1226 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1228 Log ($jobstepidx, "blaming failure on suspect node " .
1229 $slot[$proc{$pid}->{slot}]->{node}->{name});
1230 $temporary_fail ||= 1;
1232 ban_node_by_slot($proc{$pid}->{slot});
1235 Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1236 ++$Jobstep->{'failures'},
1237 $temporary_fail ? 'temporary' : 'permanent',
1240 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1241 # Give up on this task, and the whole job
1244 # Put this task back on the todo queue
1245 push @jobstep_todo, $jobstepidx;
1246 $Job->{'tasks_summary'}->{'failed'}++;
1250 push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1251 ++$thisround_succeeded;
1252 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1253 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1254 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1255 push @jobstep_done, $jobstepidx;
1256 Log ($jobstepidx, "success in $elapsed seconds");
1258 $Jobstep->{exitcode} = $childstatus;
1259 $Jobstep->{finishtime} = time;
1260 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1261 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1262 process_stderr_final ($jobstepidx);
1263 Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1264 length($Jobstep->{'arvados_task'}->{output}),
1265 $Jobstep->{'arvados_task'}->{output}));
1267 close $reader{$jobstepidx};
1268 delete $reader{$jobstepidx};
1269 delete $slot[$proc{$pid}->{slot}]->{pid};
1270 push @freeslot, $proc{$pid}->{slot};
1273 $progress_is_dirty = 1;
1276 if (scalar(@successful_task_uuids) > 0)
1278 Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1280 my $newtask_list = [];
1281 my $newtask_results;
1283 $newtask_results = api_call(
1285 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1286 'order' => 'qsequence',
1287 'offset' => scalar(@$newtask_list),
1289 push(@$newtask_list, @{$newtask_results->{items}});
1290 } while (@{$newtask_results->{items}});
1291 Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1292 foreach my $arvados_task (@$newtask_list) {
1294 'level' => $arvados_task->{'sequence'},
1296 'arvados_task' => $arvados_task
1298 push @jobstep, $jobstep;
1299 push @jobstep_todo, $#jobstep;
1303 return $children_reaped;
1306 sub check_refresh_wanted
1308 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1310 $stat[9] > $latest_refresh &&
1311 # ...and we have actually locked the job record...
1312 $job_id eq $Job->{'uuid'}) {
1313 $latest_refresh = scalar time;
1314 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1315 for my $attr ('cancelled_at',
1316 'cancelled_by_user_uuid',
1317 'cancelled_by_client_uuid',
1319 $Job->{$attr} = $Job2->{$attr};
1321 if ($Job->{'state'} ne "Running") {
1322 if ($Job->{'state'} eq "Cancelled") {
1323 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1325 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1328 $main::please_freeze = 1;
1335 my $last_squeue_check = $squeue_checked;
1337 # Do not call `squeue` or check the kill list more than once every
1339 return if $last_squeue_check > time - 15;
1340 $squeue_checked = time;
1342 # Look for children from which we haven't received stderr data since
1343 # the last squeue check. If no such children exist, all procs are
1344 # alive and there's no need to even look at squeue.
1346 # As long as the crunchstat poll interval (10s) is shorter than the
1347 # squeue check interval (15s) this should make the squeue check an
1349 my $silent_procs = 0;
1350 for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1352 if (!exists($js->{stderr_at}))
1354 $js->{stderr_at} = 0;
1356 if ($js->{stderr_at} < $last_squeue_check)
1361 return if $silent_procs == 0;
1363 # use killem() on procs whose killtime is reached
1364 while (my ($pid, $procinfo) = each %proc)
1366 my $js = $jobstep[$procinfo->{jobstepidx}];
1367 if (exists $procinfo->{killtime}
1368 && $procinfo->{killtime} <= time
1369 && $js->{stderr_at} < $last_squeue_check)
1372 if ($js->{stderr_at}) {
1373 $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1375 Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1382 # here is an opportunity to check for mysterious problems with local procs
1386 # Get a list of steps still running. Note: squeue(1) says --steps
1387 # selects a format (which we override anyway) and allows us to
1388 # specify which steps we're interested in (which we don't).
1389 # Importantly, it also changes the meaning of %j from "job name" to
1390 # "step name" and (although this isn't mentioned explicitly in the
1391 # docs) switches from "one line per job" mode to "one line per step"
1392 # mode. Without it, we'd just get a list of one job, instead of a
1394 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1397 Log(undef, "warning: squeue exit status $? ($!)");
1402 # which of my jobsteps are running, according to squeue?
1404 for my $jobstepname (@squeue)
1406 $ok{$jobstepname} = 1;
1409 # Check for child procs >60s old and not mentioned by squeue.
1410 while (my ($pid, $procinfo) = each %proc)
1412 if ($procinfo->{time} < time - 60
1413 && $procinfo->{jobstepname}
1414 && !exists $ok{$procinfo->{jobstepname}}
1415 && !exists $procinfo->{killtime})
1417 # According to slurm, this task has ended (successfully or not)
1418 # -- but our srun child hasn't exited. First we must wait (30
1419 # seconds) in case this is just a race between communication
1420 # channels. Then, if our srun child process still hasn't
1421 # terminated, we'll conclude some slurm communication
1422 # error/delay has caused the task to die without notifying srun,
1423 # and we'll kill srun ourselves.
1424 $procinfo->{killtime} = time + 30;
1425 Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1432 # If a node fails in a multi-node "srun" call during job setup, the call
1433 # may hang instead of exiting with a nonzero code. This function checks
1434 # "sinfo" for the health of the nodes that were allocated and ensures that
1435 # they are all still in the "alloc" state. If a node that is allocated to
1436 # this job is not in "alloc" state, then set please_freeze.
1438 # This is only called from srun_sync() for node configuration. If a
1439 # node fails doing actual work, there are other recovery mechanisms.
1441 # Do not call `sinfo` more than once every 15 seconds.
1442 return if $sinfo_checked > time - 15;
1443 $sinfo_checked = time;
1445 # The output format "%t" means output node states.
1446 my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1449 Log(undef, "warning: sinfo exit status $? ($!)");
1456 if ($_ != "alloc" && $_ != "alloc*") {
1457 $main::please_freeze = 1;
1462 sub release_allocation
1466 Log (undef, "release job allocation");
1467 system "scancel $ENV{SLURM_JOB_ID}";
1476 my $sel = IO::Select->new();
1477 foreach my $jobstepidx (keys %reader)
1479 my $fd = $reader{$jobstepidx};
1481 $fd_job{$fd} = $jobstepidx;
1483 if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1484 $sel->add($stdout_fd);
1485 $fd_job{$stdout_fd} = $jobstepidx;
1488 # select on all reader fds with 0.1s timeout
1489 my @ready_fds = $sel->can_read(0.1);
1490 foreach my $fd (@ready_fds)
1493 if (0 < sysread ($fd, $buf, 65536))
1496 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1498 my $jobstepidx = $fd_job{$fd};
1499 if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1500 $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1504 $jobstep[$jobstepidx]->{stderr_at} = time;
1505 $jobstep[$jobstepidx]->{stderr} .= $buf;
1507 # Consume everything up to the last \n
1508 preprocess_stderr ($jobstepidx);
1510 if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1512 # If we get a lot of stderr without a newline, chop off the
1513 # front to avoid letting our buffer grow indefinitely.
1514 substr ($jobstep[$jobstepidx]->{stderr},
1515 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1523 # Consume all full lines of stderr for a jobstep. Everything after the
1524 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1526 sub preprocess_stderr
1528 my $jobstepidx = shift;
1529 # slotindex is only defined for children running Arvados job tasks.
1530 # Be prepared to handle the undef case (for setup srun calls, etc.).
1531 my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1533 while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1535 substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1536 Log ($jobstepidx, "stderr $line");
1537 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1538 # If the allocation is revoked, we can't possibly continue, so mark all
1539 # nodes as failed. This will cause the overall exit code to be
1540 # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1542 $main::please_freeze = 1;
1543 foreach my $st (@slot) {
1544 $st->{node}->{fail_count}++;
1547 elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1548 $jobstep[$jobstepidx]->{tempfail} = 1;
1549 if (defined($job_slot_index)) {
1550 $slot[$job_slot_index]->{node}->{fail_count}++;
1551 ban_node_by_slot($job_slot_index);
1554 elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1555 $jobstep[$jobstepidx]->{tempfail} = 1;
1556 ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1558 elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1559 $jobstep[$jobstepidx]->{tempfail} = 1;
1565 sub process_stderr_final
1567 my $jobstepidx = shift;
1568 preprocess_stderr ($jobstepidx);
1571 Log ($jobstepidx, "stderr $_");
1572 } split ("\n", $jobstep[$jobstepidx]->{stderr});
1573 $jobstep[$jobstepidx]->{stderr} = '';
1580 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1581 Log(undef, "fetch_block run error from arv-get $hash: $!");
1584 my $output_block = "";
1587 my $bytes = sysread($keep, $buf, 1024 * 1024);
1588 if (!defined $bytes) {
1589 Log(undef, "fetch_block read error from arv-get: $!");
1590 $output_block = undef;
1592 } elsif ($bytes == 0) {
1593 # sysread returns 0 at the end of the pipe.
1596 # some bytes were read into buf.
1597 $output_block .= $buf;
1602 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1603 $output_block = undef;
1605 return $output_block;
1608 # Create a collection by concatenating the output of all tasks (each
1609 # task's output is either a manifest fragment, a locator for a
1610 # manifest fragment stored in Keep, or nothing at all). Return the
1611 # portable_data_hash of the new collection.
1612 sub create_output_collection
1614 Log (undef, "collate");
1616 my ($child_out, $child_in);
1617 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1620 print (arvados.api("v1").collections().
1621 create(body={"manifest_text": sys.stdin.read(),
1622 "owner_uuid": sys.argv[2]}).
1623 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1624 }, retry_count(), $Job->{owner_uuid});
1627 my $manifest_size = 0;
1631 my $output = $_->{'arvados_task'}->{output};
1632 next if (!defined($output));
1634 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1635 $next_write = fetch_block($output);
1637 $next_write = $output;
1639 if (defined($next_write)) {
1640 if (!defined(syswrite($child_in, $next_write))) {
1641 # There's been an error writing. Stop the loop.
1642 # We'll log details about the exit code later.
1645 $manifest_size += length($next_write);
1648 my $uuid = $_->{'arvados_task'}->{'uuid'};
1649 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1654 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1657 my $s = IO::Select->new($child_out);
1658 if ($s->can_read(120)) {
1659 sysread($child_out, $joboutput, 1024 * 1024);
1662 Log(undef, "output collection creation exited " . exit_status_s($?));
1668 Log (undef, "timed out while creating output collection");
1669 foreach my $signal (2, 2, 2, 15, 15, 9) {
1670 kill($signal, $pid);
1671 last if waitpid($pid, WNOHANG) == -1;
1680 # Calls create_output_collection, logs the result, and returns it.
1681 # If that was successful, save that as the output in the job record.
1682 sub save_output_collection {
1683 my $collated_output = create_output_collection();
1685 if (!$collated_output) {
1686 Log(undef, "Failed to write output collection");
1689 Log(undef, "job output $collated_output");
1690 $Job->update_attributes('output' => $collated_output);
1692 return $collated_output;
1699 my $sig = 2; # SIGINT first
1700 if (exists $proc{$_}->{"sent_$sig"} &&
1701 time - $proc{$_}->{"sent_$sig"} > 4)
1703 $sig = 15; # SIGTERM if SIGINT doesn't work
1705 if (exists $proc{$_}->{"sent_$sig"} &&
1706 time - $proc{$_}->{"sent_$sig"} > 4)
1708 $sig = 9; # SIGKILL if SIGTERM doesn't work
1710 if (!exists $proc{$_}->{"sent_$sig"})
1712 Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1714 select (undef, undef, undef, 0.1);
1717 kill $sig, $_; # srun wants two SIGINT to really interrupt
1719 $proc{$_}->{"sent_$sig"} = time;
1720 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1730 vec($bits,fileno($_),1) = 1;
1736 # Send log output to Keep via arv-put.
1738 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1739 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1740 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1741 # $log_pipe_pid is the pid of the arv-put subprocess.
1743 # The only functions that should access these variables directly are:
1745 # log_writer_start($logfilename)
1746 # Starts an arv-put pipe, reading data on stdin and writing it to
1747 # a $logfilename file in an output collection.
1749 # log_writer_read_output([$timeout])
1750 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1751 # Passes $timeout to the select() call, with a default of 0.01.
1752 # Returns the result of the last read() call on $log_pipe_out, or
1753 # -1 if read() wasn't called because select() timed out.
1754 # Only other log_writer_* functions should need to call this.
1756 # log_writer_send($txt)
1757 # Writes $txt to the output log collection.
1759 # log_writer_finish()
1760 # Closes the arv-put pipe and returns the output that it produces.
1762 # log_writer_is_active()
1763 # Returns a true value if there is currently a live arv-put
1764 # process, false otherwise.
1766 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1769 sub log_writer_start($)
1771 my $logfilename = shift;
1772 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1776 '--filename', $logfilename,
1778 $log_pipe_out_buf = "";
1779 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1782 sub log_writer_read_output {
1783 my $timeout = shift || 0.01;
1785 while ($read && $log_pipe_out_select->can_read($timeout)) {
1786 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1787 length($log_pipe_out_buf));
1789 if (!defined($read)) {
1790 Log(undef, "error reading log manifest from arv-put: $!");
1795 sub log_writer_send($)
1798 print $log_pipe_in $txt;
1799 log_writer_read_output();
1802 sub log_writer_finish()
1804 return unless $log_pipe_pid;
1806 close($log_pipe_in);
1808 my $logger_failed = 0;
1809 my $read_result = log_writer_read_output(600);
1810 if ($read_result == -1) {
1811 $logger_failed = -1;
1812 Log (undef, "timed out reading from 'arv-put'");
1813 } elsif ($read_result != 0) {
1814 $logger_failed = -2;
1815 Log(undef, "failed to read arv-put log manifest to EOF");
1818 waitpid($log_pipe_pid, 0);
1820 $logger_failed ||= $?;
1821 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1824 close($log_pipe_out);
1825 my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1826 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1827 $log_pipe_out_select = undef;
1829 return $arv_put_output;
1832 sub log_writer_is_active() {
1833 return $log_pipe_pid;
1836 sub Log # ($jobstepidx, $logmessage)
1838 my ($jobstepidx, $logmessage) = @_;
1839 if ($logmessage =~ /\n/) {
1840 for my $line (split (/\n/, $_[1])) {
1841 Log ($jobstepidx, $line);
1845 my $fh = select STDERR; $|=1; select $fh;
1847 if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1848 $task_qseq = $jobstepidx;
1850 my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1851 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1854 if (log_writer_is_active() || -t STDERR) {
1855 my @gmtime = gmtime;
1856 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1857 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1859 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1861 if (log_writer_is_active()) {
1862 log_writer_send($datetime . " " . $message);
1869 my ($package, $file, $line) = caller;
1870 my $message = "@_ at $file line $line\n";
1871 Log (undef, $message);
1872 release_allocation();
1873 freeze() if @jobstep_todo;
1874 create_output_collection() if @jobstep_todo;
1884 if ($Job->{'state'} eq 'Cancelled') {
1885 $Job->update_attributes('finished_at' => scalar gmtime);
1887 $Job->update_attributes('state' => 'Failed');
1894 my $justcheckpoint = shift; # false if this will be the last meta saved
1895 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1896 return unless log_writer_is_active();
1897 my $log_manifest = log_writer_finish();
1898 return unless defined($log_manifest);
1901 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1902 $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1905 my $log_coll = api_call(
1906 "collections/create", ensure_unique_name => 1, collection => {
1907 manifest_text => $log_manifest,
1908 owner_uuid => $Job->{owner_uuid},
1909 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1911 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1912 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1914 return $log_coll->{portable_data_hash};
1918 sub freeze_if_want_freeze
1920 if ($main::please_freeze)
1922 release_allocation();
1925 # kill some srun procs before freeze+stop
1926 map { $proc{$_} = {} } @_;
1929 killem (keys %proc);
1930 select (undef, undef, undef, 0.1);
1932 while (($died = waitpid (-1, WNOHANG)) > 0)
1934 delete $proc{$died};
1939 create_output_collection();
1949 Log (undef, "Freeze not implemented");
1956 croak ("Thaw not implemented");
1972 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1978 my $srunargs = shift;
1979 my $execargs = shift;
1980 my $opts = shift || {};
1983 my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1984 Log (undef, "$label: start");
1986 my ($stderr_r, $stderr_w);
1987 pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1989 my ($stdout_r, $stdout_w);
1990 pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1992 my $srunpid = fork();
1997 fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1998 fcntl($stdout_w, F_SETFL, 0) or croak($!);
1999 open(STDERR, ">&", $stderr_w) or croak ($!);
2000 open(STDOUT, ">&", $stdout_w) or croak ($!);
2001 srun ($srunargs, $execargs, $opts, $stdin);
2007 set_nonblocking($stderr_r);
2008 set_nonblocking($stdout_r);
2010 # Add entries to @jobstep and %proc so check_squeue() and
2011 # freeze_if_want_freeze() can treat it like a job task process.
2015 stderr_captured => '',
2016 stdout_r => $stdout_r,
2017 stdout_captured => '',
2019 my $jobstepidx = $#jobstep;
2021 jobstepidx => $jobstepidx,
2023 $reader{$jobstepidx} = $stderr_r;
2025 while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2026 my $busy = readfrompipes();
2027 if (!$busy || ($latest_refresh + 2 < scalar time)) {
2028 check_refresh_wanted();
2033 select(undef, undef, undef, 0.1);
2035 killem(keys %proc) if $main::please_freeze;
2039 1 while readfrompipes();
2040 process_stderr_final ($jobstepidx);
2042 Log (undef, "$label: exit ".exit_status_s($exited));
2046 delete $proc{$srunpid};
2047 delete $reader{$jobstepidx};
2049 my $j = pop @jobstep;
2050 # If the srun showed signs of tempfail, ensure the caller treats that as a
2052 if ($main::please_freeze || $j->{tempfail}) {
2055 return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2061 my $srunargs = shift;
2062 my $execargs = shift;
2063 my $opts = shift || {};
2065 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2067 $Data::Dumper::Terse = 1;
2068 $Data::Dumper::Indent = 0;
2069 my $show_cmd = Dumper($args);
2070 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2071 $show_cmd =~ s/\n/ /g;
2072 if ($opts->{fork}) {
2073 Log(undef, "starting: $show_cmd");
2075 # This is a child process: parent is in charge of reading our
2076 # stderr and copying it to Log() if needed.
2077 warn "starting: $show_cmd\n";
2080 if (defined $stdin) {
2081 my $child = open STDIN, "-|";
2082 defined $child or die "no fork: $!";
2084 print $stdin or die $!;
2085 close STDOUT or die $!;
2090 return system (@$args) if $opts->{fork};
2093 warn "ENV size is ".length(join(" ",%ENV));
2094 die "exec failed: $!: @$args";
2098 sub ban_node_by_slot {
2099 # Don't start any new jobsteps on this node for 60 seconds
2101 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2102 $slot[$slotid]->{node}->{hold_count}++;
2103 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2108 my ($lockfile, $error_message) = @_;
2109 open L, ">", $lockfile or croak("$lockfile: $!");
2110 if (!flock L, LOCK_EX|LOCK_NB) {
2111 croak("Can't lock $lockfile: $error_message\n");
2115 sub find_docker_image {
2116 # Given a Keep locator, check to see if it contains a Docker image.
2117 # If so, return its stream name and Docker hash.
2118 # If not, return undef for both values.
2119 my $locator = shift;
2120 my ($streamname, $filename);
2121 my $image = api_call("collections/get", uuid => $locator);
2123 foreach my $line (split(/\n/, $image->{manifest_text})) {
2124 my @tokens = split(/\s+/, $line);
2126 $streamname = shift(@tokens);
2127 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2128 if (defined($filename)) {
2129 return (undef, undef); # More than one file in the Collection.
2131 $filename = (split(/:/, $filedata, 3))[2];
2136 if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2137 return ($streamname, $1);
2139 return (undef, undef);
2143 sub exit_retry_unlocked {
2144 Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2145 exit(EX_RETRY_UNLOCKED);
2149 # Calculate the number of times an operation should be retried,
2150 # assuming exponential backoff, and that we're willing to retry as
2151 # long as tasks have been running. Enforce a minimum of 3 retries.
2152 my ($starttime, $endtime, $timediff, $retries);
2154 $starttime = $jobstep[0]->{starttime};
2155 $endtime = $jobstep[-1]->{finishtime};
2157 if (!defined($starttime)) {
2159 } elsif (!defined($endtime)) {
2160 $timediff = time - $starttime;
2162 $timediff = ($endtime - $starttime) - (time - $endtime);
2164 if ($timediff > 0) {
2165 $retries = int(log($timediff) / log(2));
2167 $retries = 1; # Use the minimum.
2169 return ($retries > 3) ? $retries : 3;
2173 # Pass in two function references.
2174 # This method will be called with the remaining arguments.
2175 # If it dies, retry it with exponential backoff until it succeeds,
2176 # or until the current retry_count is exhausted. After each failure
2177 # that can be retried, the second function will be called with
2178 # the current try count (0-based), next try time, and error message.
2179 my $operation = shift;
2180 my $op_text = shift;
2181 my $retries = retry_count();
2182 my $retry_callback = sub {
2183 my ($try_count, $next_try_at, $errmsg) = @_;
2184 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2185 $errmsg =~ s/\s/ /g;
2186 $errmsg =~ s/\s+$//;
2188 if ($next_try_at < time) {
2189 $retry_msg = "Retrying.";
2191 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2192 $retry_msg = "Retrying at $next_try_fmt.";
2194 Log(undef, "$op_text failed: $errmsg. $retry_msg");
2196 foreach my $try_count (0..$retries) {
2197 my $next_try = time + (2 ** $try_count);
2198 my $result = eval { $operation->(@_); };
2201 } elsif ($try_count < $retries) {
2202 $retry_callback->($try_count, $next_try, $@);
2203 my $sleep_time = $next_try - time;
2204 sleep($sleep_time) if ($sleep_time > 0);
2207 # Ensure the error message ends in a newline, so Perl doesn't add
2208 # retry_op's line number to it.
2214 # Pass in a /-separated API method name, and arguments for it.
2215 # This function will call that method, retrying as needed until
2216 # the current retry_count is exhausted, with a log on the first failure.
2217 my $method_name = shift;
2219 foreach my $key (split(/\//, $method_name)) {
2220 $method = $method->{$key};
2222 return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2226 # Given a $?, return a human-readable exit code string like "0" or
2227 # "1" or "0 with signal 1" or "1 with signal 11".
2228 my $exitcode = shift;
2229 my $s = $exitcode >> 8;
2230 if ($exitcode & 0x7f) {
2231 $s .= " with signal " . ($exitcode & 0x7f);
2233 if ($exitcode & 0x80) {
2234 $s .= " with core dump";
2239 sub handle_readall {
2240 # Pass in a glob reference to a file handle.
2241 # Read all its contents and return them as a string.
2242 my $fh_glob_ref = shift;
2244 return <$fh_glob_ref>;
2247 sub tar_filename_n {
2249 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2252 sub add_git_archive {
2253 # Pass in a git archive command as a string or list, a la system().
2254 # This method will save its output to be included in the archive sent to the
2258 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2259 croak("Failed to save git archive: $!");
2261 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2263 waitpid($git_pid, 0);
2266 croak("Failed to save git archive: git exited " . exit_status_s($?));
2270 sub combined_git_archive {
2271 # Combine all saved tar archives into a single archive, then return its
2272 # contents in a string. Return undef if no archives have been saved.
2273 if ($git_tar_count < 1) {
2276 my $base_tar_name = tar_filename_n(1);
2277 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2278 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2279 if ($tar_exit != 0) {
2280 croak("Error preparing build archive: tar -A exited " .
2281 exit_status_s($tar_exit));
2284 if (!open(GIT_TAR, "<", $base_tar_name)) {
2285 croak("Could not open build archive: $!");
2287 my $tar_contents = handle_readall(\*GIT_TAR);
2289 return $tar_contents;
2292 sub set_nonblocking {
2294 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2295 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2301 # This is crunch-job's internal dispatch script. crunch-job running on the API
2302 # server invokes this script on individual compute nodes, or localhost if we're
2303 # running a job locally. It gets called in two modes:
2305 # * No arguments: Installation mode. Read a tar archive from the DATA
2306 # file handle; it includes the Crunch script's source code, and
2307 # maybe SDKs as well. Those should be installed in the proper
2308 # locations. This runs outside of any Docker container, so don't try to
2309 # introspect Crunch's runtime environment.
2311 # * With arguments: Crunch script run mode. This script should set up the
2312 # environment, then run the command specified in the arguments. This runs
2313 # inside any Docker container.
2316 use File::Path qw( make_path remove_tree );
2317 use POSIX qw(getcwd);
2319 use constant TASK_TEMPFAIL => 111;
2321 # Map SDK subdirectories to the path environments they belong to.
2322 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2324 my $destdir = $ENV{"CRUNCH_SRC"};
2325 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2326 my $repo = $ENV{"CRUNCH_SRC_URL"};
2327 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2328 my $job_work = $ENV{"JOB_WORK"};
2329 my $task_work = $ENV{"TASK_WORK"};
2331 open(STDOUT_ORIG, ">&", STDOUT);
2332 open(STDERR_ORIG, ">&", STDERR);
2334 for my $dir ($destdir, $job_work, $task_work) {
2337 -e $dir or die "Failed to create temporary directory ($dir): $!";
2342 remove_tree($task_work, {keep_root => 1});
2345 ### Crunch script run mode
2347 # We want to do routine logging during task 0 only. This gives the user
2348 # the information they need, but avoids repeating the information for every
2351 if ($ENV{TASK_SEQUENCE} eq "0") {
2354 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2360 my $python_src = "$install_dir/python";
2361 my $venv_dir = "$job_work/.arvados.venv";
2362 my $venv_built = -e "$venv_dir/bin/activate";
2363 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2364 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2365 "--python=python2.7", $venv_dir);
2366 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2368 $Log->("Built Python SDK virtualenv");
2371 my @pysdk_version_cmd = ("python", "-c",
2372 "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2374 $Log->("Running in Python SDK virtualenv");
2375 @pysdk_version_cmd = ();
2376 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2377 @ARGV = ("/bin/sh", "-ec",
2378 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2379 } elsif (-d $python_src) {
2380 $Log->("Warning: virtualenv not found inside Docker container default " .
2381 "\$PATH. Can't install Python SDK.");
2384 if (@pysdk_version_cmd) {
2385 open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2386 my $pysdk_version = <$pysdk_version_pipe>;
2387 close($pysdk_version_pipe);
2389 chomp($pysdk_version);
2390 $Log->("Using Arvados SDK version $pysdk_version");
2392 # A lot could've gone wrong here, but pretty much all of it means that
2393 # Python won't be able to load the Arvados SDK.
2394 $Log->("Warning: Arvados SDK not found");
2398 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2399 my $sdk_path = "$install_dir/$sdk_dir";
2401 if ($ENV{$sdk_envkey}) {
2402 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2404 $ENV{$sdk_envkey} = $sdk_path;
2406 $Log->("Arvados SDK added to %s", $sdk_envkey);
2411 die "Cannot exec `@ARGV`: $!";
2414 ### Installation mode
2415 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2417 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2418 # This exact git archive (source + arvados sdk) is already installed
2419 # here, so there's no need to reinstall it.
2421 # We must consume our DATA section, though: otherwise the process
2422 # feeding it to us will get SIGPIPE.
2424 while (read(DATA, $buf, 65536)) { }
2429 unlink "$destdir.archive_hash";
2433 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2434 local $SIG{PIPE} = "IGNORE";
2435 warn "Extracting archive: $archive_hash\n";
2436 # --ignore-zeros is necessary sometimes: depending on how much NUL
2437 # padding tar -A put on our combined archive (which in turn depends
2438 # on the length of the component archives) tar without
2439 # --ignore-zeros will exit before consuming stdin and cause close()
2440 # to fail on the resulting SIGPIPE.
2441 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2442 die "Error launching 'tar -xC $destdir': $!";
2444 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2445 # get SIGPIPE. We must feed it data incrementally.
2447 while (read(DATA, $tar_input, 65536)) {
2448 print TARX $tar_input;
2451 die "'tar -xC $destdir' exited $?: $!";
2457 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2459 foreach my $sdk_lang (("python",
2460 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2461 if (-d "$sdk_root/$sdk_lang") {
2462 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2463 die "Failed to install $sdk_lang SDK: $!";
2469 my $python_dir = "$install_dir/python";
2470 if ((-d $python_dir) and can_run("python2.7")) {
2471 open(my $egg_info_pipe, "-|",
2472 "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2473 my @egg_info_errors = <$egg_info_pipe>;
2474 close($egg_info_pipe);
2477 if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2478 # egg_info apparently failed because it couldn't ask git for a build tag.
2479 # Specify no build tag.
2480 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2481 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2484 my $egg_info_exit = $? >> 8;
2485 foreach my $errline (@egg_info_errors) {
2488 warn "python setup.py egg_info failed: exit $egg_info_exit";
2489 exit ($egg_info_exit || 1);
2494 # Hide messages from the install script (unless it fails: shell_or_die
2495 # will show $destdir.log in that case).
2496 open(STDOUT, ">>", "$destdir.log") or die ($!);
2497 open(STDERR, ">&", STDOUT) or die ($!);
2499 if (-e "$destdir/crunch_scripts/install") {
2500 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2501 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2503 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2504 } elsif (-e "./install.sh") {
2505 shell_or_die (undef, "./install.sh", $install_dir);
2508 if ($archive_hash) {
2509 unlink "$destdir.archive_hash.new";
2510 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2511 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2517 my $command_name = shift;
2518 open(my $which, "-|", "which", $command_name) or die ($!);
2519 while (<$which>) { }
2526 my $exitcode = shift;
2528 if ($ENV{"DEBUG"}) {
2529 print STDERR "@_\n";
2531 if (system (@_) != 0) {
2534 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2535 open STDERR, ">&STDERR_ORIG";
2536 system ("cat $destdir.log >&2");
2537 warn "@_ failed ($err): $exitstatus";
2538 if (defined($exitcode)) {
2542 exit (($code >> 8) || 1);