2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3 # Copyright (C) The Arvados Authors. All rights reserved.
5 # SPDX-License-Identifier: AGPL-3.0
9 crunch-job: Execute job steps, save snapshots as requested, collate output.
13 Obtain job details from Arvados, run tasks on compute nodes (typically
14 invoked by scheduler on controller):
16 crunch-job --job x-y-z --git-dir /path/to/repo/.git
18 Obtain job details from command line, run tasks on local machine
19 (typically invoked by application or developer on VM):
21 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
23 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
31 If the job is already locked, steal the lock and run it anyway.
35 Path to a .git directory (or a git URL) where the commit given in the
36 job's C<script_version> attribute is to be found. If this is I<not>
37 given, the job's C<repository> attribute will be used.
41 Arvados API authorization token to use during the course of the job.
45 Do not clear per-job/task temporary directories during initial job
46 setup. This can speed up development and debugging when running jobs
51 UUID of the job to run, or a JSON-encoded job resource without a
52 UUID. If the latter is given, a new job object will be created.
56 =head1 RUNNING JOBS LOCALLY
58 crunch-job's log messages appear on stderr along with the job tasks'
59 stderr streams. The log is saved in Keep at each checkpoint and when
62 If the job succeeds, the job's output locator is printed on stdout.
64 While the job is running, the following signals are accepted:
68 =item control-C, SIGINT, SIGQUIT
70 Save a checkpoint, terminate any job tasks that are running, and stop.
74 Save a checkpoint and continue.
78 Refresh node allocation (i.e., check whether any nodes have been added
79 or unallocated) and attributes of the Job record that should affect
80 behavior (e.g., cancel job if cancelled_at becomes non-nil).
88 use POSIX ':sys_wait_h';
89 use POSIX qw(strftime);
90 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
94 use Digest::MD5 qw(md5_hex);
100 use File::Path qw( make_path remove_tree );
102 use constant TASK_TEMPFAIL => 111;
103 use constant EX_TEMPFAIL => 75;
104 use constant EX_RETRY_UNLOCKED => 93;
106 $ENV{"TMPDIR"} ||= "/tmp";
107 unless (defined $ENV{"CRUNCH_TMP"}) {
108 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
109 if ($ENV{"USER"} ne "crunch" && $< != 0) {
110 # use a tmp dir unique for my uid
111 $ENV{"CRUNCH_TMP"} .= "-$<";
115 # Create the tmp directory if it does not exist
116 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
117 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
120 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
121 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
122 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
123 mkdir ($ENV{"JOB_WORK"});
132 my $cgroup_root = "/sys/fs/cgroup";
133 my $docker_bin = "docker.io";
134 my $docker_run_args = "";
135 GetOptions('force-unlock' => \$force_unlock,
136 'git-dir=s' => \$git_dir,
137 'job=s' => \$jobspec,
138 'job-api-token=s' => \$job_api_token,
139 'no-clear-tmp' => \$no_clear_tmp,
140 'resume-stash=s' => \$resume_stash,
141 'cgroup-root=s' => \$cgroup_root,
142 'docker-bin=s' => \$docker_bin,
143 'docker-run-args=s' => \$docker_run_args,
146 if (defined $job_api_token) {
147 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
150 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
155 $main::ENV{CRUNCH_DEBUG} = 1;
159 $main::ENV{CRUNCH_DEBUG} = 0;
162 my $arv = Arvados->new('apiVersion' => 'v1');
171 if ($jobspec =~ /^[-a-z\d]+$/)
173 # $jobspec is an Arvados UUID, not a JSON job specification
174 $Job = api_call("jobs/get", uuid => $jobspec);
179 $local_job = JSON::decode_json($jobspec);
183 # Make sure our workers (our slurm nodes, localhost, or whatever) are
184 # at least able to run basic commands: they aren't down or severely
187 if (($Job || $local_job)->{docker_image_locator}) {
188 $cmd = [$docker_bin, 'ps', '-q'];
190 Log(undef, "Sanity check is `@$cmd`");
191 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
192 ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
194 {label => "sanity check"});
196 Log(undef, "Sanity check failed: ".exit_status_s($exited));
199 Log(undef, "Sanity check OK");
202 my $User = api_call("users/current");
205 if (!$force_unlock) {
206 # Claim this job, and make sure nobody else does
207 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
209 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
218 map { croak ("No $_ specified") unless $local_job->{$_} }
219 qw(script script_version script_parameters);
222 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
223 $local_job->{'started_at'} = gmtime;
224 $local_job->{'state'} = 'Running';
226 $Job = api_call("jobs/create", job => $local_job);
228 $job_id = $Job->{'uuid'};
230 my $keep_logfile = $job_id . '.log.txt';
231 log_writer_start($keep_logfile);
233 $Job->{'runtime_constraints'} ||= {};
234 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
235 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
237 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
239 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
240 chomp($gem_versions);
241 chop($gem_versions); # Closing parentheses
246 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
248 Log (undef, "check slurm allocation");
251 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
255 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
256 push @sinfo, "$localcpus localhost";
258 if (exists $ENV{SLURM_NODELIST})
260 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
264 my ($ncpus, $slurm_nodelist) = split;
265 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
268 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
271 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
274 foreach (split (",", $ranges))
287 push @nodelist, map {
289 $n =~ s/\[[-,\d]+\]/$_/;
296 push @nodelist, $nodelist;
299 foreach my $nodename (@nodelist)
301 Log (undef, "node $nodename - $ncpus slots");
302 my $node = { name => $nodename,
304 # The number of consecutive times a task has been dispatched
305 # to this node and failed.
307 # The number of consecutive times that SLURM has reported
308 # a node failure since the last successful task.
310 # Don't dispatch work to this node until this time
311 # (in seconds since the epoch) has passed.
313 foreach my $cpu (1..$ncpus)
315 push @slot, { node => $node,
319 push @node, @nodelist;
324 # Ensure that we get one jobstep running on each allocated node before
325 # we start overloading nodes with concurrent steps
327 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
330 $Job->update_attributes(
331 'tasks_summary' => { 'failed' => 0,
336 Log (undef, "start");
337 $SIG{'INT'} = sub { $main::please_freeze = 1; };
338 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
339 $SIG{'TERM'} = \&croak;
340 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
341 $SIG{'ALRM'} = sub { $main::please_info = 1; };
342 $SIG{'CONT'} = sub { $main::please_continue = 1; };
343 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
345 $main::please_freeze = 0;
346 $main::please_info = 0;
347 $main::please_continue = 0;
348 $main::please_refresh = 0;
349 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
351 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
352 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
353 $ENV{"JOB_UUID"} = $job_id;
356 my @jobstep_todo = ();
357 my @jobstep_done = ();
358 my @jobstep_tomerge = ();
359 my $jobstep_tomerge_level = 0;
360 my $squeue_checked = 0;
361 my $sinfo_checked = 0;
362 my $latest_refresh = scalar time;
366 if (defined $Job->{thawedfromkey})
368 thaw ($Job->{thawedfromkey});
372 my $first_task = api_call("job_tasks/create", job_task => {
373 'job_uuid' => $Job->{'uuid'},
378 push @jobstep, { 'level' => 0,
380 'arvados_task' => $first_task,
382 push @jobstep_todo, 0;
388 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
391 my $build_script = handle_readall(\*DATA);
392 my $nodelist = join(",", @node);
393 my $git_tar_count = 0;
395 if (!defined $no_clear_tmp) {
396 # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
397 # up work directories crunch_tmp/work, crunch_tmp/opt,
399 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
400 ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
402 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
403 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
405 {label => "clean work dirs"});
407 exit_retry_unlocked();
411 # If this job requires a Docker image, install that.
412 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
413 if ($docker_locator = $Job->{docker_image_locator}) {
414 Log (undef, "Install docker image $docker_locator");
415 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
418 croak("No Docker image hash found from locator $docker_locator");
420 Log (undef, "docker image hash is $docker_hash");
421 $docker_stream =~ s/^\.//;
422 my $docker_install_script = qq{
424 id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
425 echo "image ID is \$id"
426 [[ \${id} = \Q$docker_hash\E ]]
428 if loaded >&2 2>/dev/null; then
429 echo >&2 "image is already present"
432 echo >&2 "docker image is not present; loading"
433 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
434 if ! loaded >&2; then
435 echo >&2 "`docker load` exited 0, but image is not found (!)"
438 echo >&2 "image loaded successfully"
441 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
442 ["srun", "--nodelist=" . join(',', @node)],
443 ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
444 {label => "load docker image"});
447 exit_retry_unlocked();
450 # Determine whether this version of Docker supports memory+swap limits.
451 ($exited, $stdout, $stderr, $tempfail) = srun_sync(
452 ["srun", "--nodes=1"],
453 [$docker_bin, 'run', '--help'],
454 {label => "check --memory-swap feature"});
456 exit_retry_unlocked();
458 $docker_limitmem = ($stdout =~ /--memory-swap/);
460 # Find a non-root Docker user to use.
461 # Tries the default user for the container, then 'crunch', then 'nobody',
462 # testing for whether the actual user id is non-zero. This defends against
463 # mistakes but not malice, but we intend to harden the security in the future
464 # so we don't want anyone getting used to their jobs running as root in their
466 my @tryusers = ("", "crunch", "nobody");
467 foreach my $try_user (@tryusers) {
470 if ($try_user eq "") {
471 $label = "check whether default user is UID 0";
474 $label = "check whether user '$try_user' is UID 0";
475 $try_user_arg = "--user=$try_user";
477 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
478 ["srun", "--nodes=1"],
480 "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
483 if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
484 $dockeruserarg = $try_user_arg;
485 if ($try_user eq "") {
486 Log(undef, "Container will run with default user");
488 Log(undef, "Container will run with $dockeruserarg");
491 } elsif ($tempfail) {
492 exit_retry_unlocked();
496 if (!defined $dockeruserarg) {
497 croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
500 if ($Job->{arvados_sdk_version}) {
501 # The job also specifies an Arvados SDK version. Add the SDKs to the
502 # tar file for the build script to install.
503 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
504 $Job->{arvados_sdk_version}));
505 add_git_archive("git", "--git-dir=$git_dir", "archive",
506 "--prefix=.arvados.sdk/",
507 $Job->{arvados_sdk_version}, "sdk");
511 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
512 # If script_version looks like an absolute path, *and* the --git-dir
513 # argument was not given -- which implies we were not invoked by
514 # crunch-dispatch -- we will use the given path as a working
515 # directory instead of resolving script_version to a git commit (or
516 # doing anything else with git).
517 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
518 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
521 # Resolve the given script_version to a git commit sha1. Also, if
522 # the repository is remote, clone it into our local filesystem: this
523 # ensures "git archive" will work, and is necessary to reliably
524 # resolve a symbolic script_version like "master^".
525 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
527 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
529 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
531 # If we're running under crunch-dispatch, it will have already
532 # pulled the appropriate source tree into its own repository, and
533 # given us that repo's path as $git_dir.
535 # If we're running a "local" job, we might have to fetch content
536 # from a remote repository.
538 # (Currently crunch-dispatch gives a local path with --git-dir, but
539 # we might as well accept URLs there too in case it changes its
541 my $repo = $git_dir || $Job->{'repository'};
543 # Repository can be remote or local. If remote, we'll need to fetch it
544 # to a local dir before doing `git log` et al.
547 if ($repo =~ m{://|^[^/]*:}) {
548 # $repo is a git url we can clone, like git:// or https:// or
549 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
550 # not recognized here because distinguishing that from a local
551 # path is too fragile. If you really need something strange here,
552 # use the ssh:// form.
553 $repo_location = 'remote';
554 } elsif ($repo =~ m{^\.*/}) {
555 # $repo is a local path to a git index. We'll also resolve ../foo
556 # to ../foo/.git if the latter is a directory. To help
557 # disambiguate local paths from named hosted repositories, this
558 # form must be given as ./ or ../ if it's a relative path.
559 if (-d "$repo/.git") {
560 $repo = "$repo/.git";
562 $repo_location = 'local';
564 # $repo is none of the above. It must be the name of a hosted
566 my $arv_repo_list = api_call("repositories/list",
567 'filters' => [['name','=',$repo]]);
568 my @repos_found = @{$arv_repo_list->{'items'}};
569 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
571 Log(undef, "Repository '$repo' -> "
572 . join(", ", map { $_->{'uuid'} } @repos_found));
575 croak("Error: Found $n_found repositories with name '$repo'.");
577 $repo = $repos_found[0]->{'fetch_url'};
578 $repo_location = 'remote';
580 Log(undef, "Using $repo_location repository '$repo'");
581 $ENV{"CRUNCH_SRC_URL"} = $repo;
583 # Resolve given script_version (we'll call that $treeish here) to a
584 # commit sha1 ($commit).
585 my $treeish = $Job->{'script_version'};
587 if ($repo_location eq 'remote') {
588 # We minimize excess object-fetching by re-using the same bare
589 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
590 # just keep adding remotes to it as needed.
591 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
592 my $gitcmd = "git --git-dir=\Q$local_repo\E";
594 # Set up our local repo for caching remote objects, making
596 if (!-d $local_repo) {
597 make_path($local_repo) or croak("Error: could not create $local_repo");
599 # This works (exits 0 and doesn't delete fetched objects) even
600 # if $local_repo is already initialized:
601 `$gitcmd init --bare`;
603 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
606 # If $treeish looks like a hash (or abbrev hash) we look it up in
607 # our local cache first, since that's cheaper. (We don't want to
608 # do that with tags/branches though -- those change over time, so
609 # they should always be resolved by the remote repo.)
610 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
611 # Hide stderr because it's normal for this to fail:
612 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
614 # Careful not to resolve a branch named abcdeff to commit 1234567:
615 $sha1 =~ /^$treeish/ &&
616 $sha1 =~ /^([0-9a-f]{40})$/s) {
618 Log(undef, "Commit $commit already present in $local_repo");
622 if (!defined $commit) {
623 # If $treeish isn't just a hash or abbrev hash, or isn't here
624 # yet, we need to fetch the remote to resolve it correctly.
626 # First, remove all local heads. This prevents a name that does
627 # not exist on the remote from resolving to (or colliding with)
628 # a previously fetched branch or tag (possibly from a different
630 remove_tree("$local_repo/refs/heads", {keep_root => 1});
632 Log(undef, "Fetching objects from $repo to $local_repo");
633 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
635 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
639 # Now that the data is all here, we will use our local repo for
640 # the rest of our git activities.
644 my $gitcmd = "git --git-dir=\Q$repo\E";
645 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
646 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
647 croak("`$gitcmd rev-list` exited "
649 .", '$treeish' not found, giving up");
652 Log(undef, "Version $treeish is commit $commit");
654 if ($commit ne $Job->{'script_version'}) {
655 # Record the real commit id in the database, frozentokey, logs,
656 # etc. -- instead of an abbreviation or a branch name which can
657 # become ambiguous or point to a different commit in the future.
658 if (!$Job->update_attributes('script_version' => $commit)) {
659 croak("Error: failed to update job's script_version attribute");
663 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
664 add_git_archive("$gitcmd archive ''\Q$commit\E");
667 my $git_archive = combined_git_archive();
668 if (!defined $git_archive) {
669 Log(undef, "Skip install phase (no git archive)");
671 Log(undef, "Warning: This probably means workers have no source tree!");
676 my $install_script_tries_left = 3;
677 for (my $attempts = 0; $attempts < 3; $attempts++) {
678 my @srunargs = ("srun",
679 "--nodelist=$nodelist",
680 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
681 my @execargs = ("sh", "-c",
682 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
684 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
685 my ($stdout, $stderr, $tempfail);
686 ($exited, $stdout, $stderr, $tempfail) = srun_sync(
687 \@srunargs, \@execargs,
688 {label => "run install script on all workers"},
689 $build_script . $git_archive);
691 exit_retry_unlocked();
694 my $stderr_anything_from_script = 0;
695 for my $line (split(/\n/, $stderr)) {
696 if ($line !~ /^(srun: error: |starting: \[)/) {
697 $stderr_anything_from_script = 1;
701 last if $exited == 0 || $main::please_freeze;
703 # If the install script fails but doesn't print an error message,
704 # the next thing anyone is likely to do is just run it again in
705 # case it was a transient problem like "slurm communication fails
706 # because the network isn't reliable enough". So we'll just do
707 # that ourselves (up to 3 attempts in total). OTOH, if there is an
708 # error message, the problem is more likely to have a real fix and
709 # we should fail the job so the fixing process can start, instead
710 # of doing 2 more attempts.
711 last if $stderr_anything_from_script;
714 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
715 unlink($tar_filename);
723 foreach (qw (script script_version script_parameters runtime_constraints))
727 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
729 foreach (split (/\n/, $Job->{knobs}))
731 Log (undef, "knob " . $_);
735 'filters' => [['hostname', 'in', \@node]],
736 'order' => 'hostname',
737 'limit' => scalar(@node),
739 for my $n (@{$resp->{items}}) {
740 Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
745 $main::success = undef;
751 my $thisround_succeeded = 0;
752 my $thisround_failed = 0;
753 my $thisround_failed_multiple = 0;
754 my $working_slot_count = scalar(@slot);
756 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
757 or $a <=> $b } @jobstep_todo;
758 my $level = $jobstep[$jobstep_todo[0]]->{level};
760 my $initial_tasks_this_level = 0;
761 foreach my $id (@jobstep_todo) {
762 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
765 # If the number of tasks scheduled at this level #T is smaller than the number
766 # of slots available #S, only use the first #T slots, or the first slot on
767 # each node, whichever number is greater.
769 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
770 # based on these numbers. Using fewer slots makes more resources available
771 # to each individual task, which should normally be a better strategy when
772 # there are fewer of them running with less parallelism.
774 # Note that this calculation is not redone if the initial tasks at
775 # this level queue more tasks at the same level. This may harm
776 # overall task throughput for that level.
778 if ($initial_tasks_this_level < @node) {
779 @freeslot = (0..$#node);
780 } elsif ($initial_tasks_this_level < @slot) {
781 @freeslot = (0..$initial_tasks_this_level - 1);
783 @freeslot = (0..$#slot);
785 my $round_num_freeslots = scalar(@freeslot);
786 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
788 my %round_max_slots = ();
789 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
790 my $this_slot = $slot[$freeslot[$ii]];
791 my $node_name = $this_slot->{node}->{name};
792 $round_max_slots{$node_name} ||= $this_slot->{cpu};
793 last if (scalar(keys(%round_max_slots)) >= @node);
796 Log(undef, "start level $level with $round_num_freeslots slots");
799 my $progress_is_dirty = 1;
800 my $progress_stats_updated = 0;
802 update_progress_stats();
806 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
808 # Don't create new tasks if we already know the job's final result.
809 last if defined($main::success);
811 my $id = $jobstep_todo[$todo_ptr];
812 my $Jobstep = $jobstep[$id];
813 if ($Jobstep->{level} != $level)
818 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
819 set_nonblocking($reader{$id});
821 my $childslot = $freeslot[0];
822 my $childnode = $slot[$childslot]->{node};
823 my $childslotname = join (".",
824 $slot[$childslot]->{node}->{name},
825 $slot[$childslot]->{cpu});
827 my $childpid = fork();
830 $SIG{'INT'} = 'DEFAULT';
831 $SIG{'QUIT'} = 'DEFAULT';
832 $SIG{'TERM'} = 'DEFAULT';
834 foreach (values (%reader))
838 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
839 open(STDOUT,">&writer") or croak ($!);
840 open(STDERR,">&writer") or croak ($!);
845 delete $ENV{"GNUPGHOME"};
846 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
847 $ENV{"TASK_QSEQUENCE"} = $id;
848 $ENV{"TASK_SEQUENCE"} = $level;
849 $ENV{"JOB_SCRIPT"} = $Job->{script};
850 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
851 $param =~ tr/a-z/A-Z/;
852 $ENV{"JOB_PARAMETER_$param"} = $value;
854 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
855 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
856 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
857 $ENV{"HOME"} = $ENV{"TASK_WORK"};
858 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
859 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
860 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
862 my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
868 "--nodelist=".$childnode->{name},
869 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
870 "--job-name=$job_id.$id.$$",
873 my $stdbuf = " stdbuf --output=0 --error=0 ";
875 my $arv_file_cache = "";
876 if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
877 $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
881 "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
882 ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
883 ."&& cd \Q$ENV{CRUNCH_TMP}\E "
884 # These environment variables get used explicitly later in
885 # $command. No tool is expected to read these values directly.
886 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
887 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
888 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
889 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
890 .q{&& declare -a VOLUMES=() }
891 .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
892 .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
893 .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
895 $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
896 $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
897 $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
901 my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
902 my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
903 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
904 $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
905 # We only set memory limits if Docker lets us limit both memory and swap.
906 # Memory limits alone have been supported longer, but subprocesses tend
907 # to get SIGKILL if they exceed that without any swap limit set.
908 # See #5642 for additional background.
909 if ($docker_limitmem) {
910 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
913 # The source tree and $destdir directory (which we have
914 # installed on the worker host) are available in the container,
915 # under the same path.
916 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
917 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
919 # Currently, we make the "by_pdh" directory in arv-mount's mount
920 # point appear at /keep inside the container (instead of using
921 # the same path as the host like we do with CRUNCH_SRC and
922 # CRUNCH_INSTALL). However, crunch scripts and utilities must
923 # not rely on this. They must use $TASK_KEEPMOUNT.
924 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
925 $ENV{TASK_KEEPMOUNT} = "/keep";
927 # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
928 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
929 $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
931 # TASK_WORK is almost exactly like a docker data volume: it
932 # starts out empty, is writable, and persists until no
933 # containers use it any more. We don't use --volumes-from to
934 # share it with other containers: it is only accessible to this
935 # task, and it goes away when this task stops.
937 # However, a docker data volume is writable only by root unless
938 # the mount point already happens to exist in the container with
939 # different permissions. Therefore, we [1] assume /tmp already
940 # exists in the image and is writable by the crunch user; [2]
941 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
942 # writable if they are created by docker while setting up the
943 # other --volumes); and [3] create $TASK_WORK inside the
944 # container using $build_script.
945 $command .= "--volume=/tmp ";
946 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
947 $ENV{"HOME"} = $ENV{"TASK_WORK"};
948 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
950 # TODO: Share a single JOB_WORK volume across all task
951 # containers on a given worker node, and delete it when the job
952 # ends (and, in case that doesn't work, when the next job
955 # For now, use the same approach as TASK_WORK above.
956 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
958 # Bind mount the crunchrunner binary and host TLS certificates file into
960 $command .= '"${VOLUMES[@]}" ';
962 while (my ($env_key, $env_val) = each %ENV)
964 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
965 $command .= "--env=\Q$env_key=$env_val\E ";
968 $command .= "--env=\QHOME=$ENV{HOME}\E ";
969 $command .= "\Q$docker_hash\E ";
971 if ($Job->{arvados_sdk_version}) {
973 $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
975 $command .= "/bin/sh -c \'python -c " .
976 '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
977 ">&2 2>/dev/null; " .
978 "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
979 "if which stdbuf >/dev/null ; then " .
980 " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
982 " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
987 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
989 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
992 my @execargs = ('bash', '-c', $command);
993 srun (\@srunargs, \@execargs, undef, $build_script);
994 # exec() failed, we assume nothing happened.
995 die "srun() failed on build script\n";
998 if (!defined $childpid)
1001 delete $reader{$id};
1005 $proc{$childpid} = {
1009 jobstepname => "$job_id.$id.$childpid",
1011 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1012 $slot[$childslot]->{pid} = $childpid;
1014 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1015 Log ($id, "child $childpid started on $childslotname");
1016 $Jobstep->{starttime} = time;
1017 $Jobstep->{node} = $childnode->{name};
1018 $Jobstep->{slotindex} = $childslot;
1019 delete $Jobstep->{stderr};
1020 delete $Jobstep->{finishtime};
1021 delete $Jobstep->{tempfail};
1023 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1024 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1026 splice @jobstep_todo, $todo_ptr, 1;
1029 $progress_is_dirty = 1;
1033 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1035 last THISROUND if $main::please_freeze;
1036 if ($main::please_info)
1038 $main::please_info = 0;
1040 create_output_collection();
1042 update_progress_stats();
1047 if (!$gotsome || ($latest_refresh + 2 < scalar time))
1049 check_refresh_wanted();
1051 update_progress_stats();
1053 elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1055 update_progress_stats();
1058 select (undef, undef, undef, 0.1);
1060 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1061 $_->{node}->{hold_count} < 4 } @slot);
1062 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1063 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1065 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1066 .($thisround_failed+$thisround_succeeded)
1067 .") -- giving up on this round";
1068 Log (undef, $message);
1072 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1073 for (my $i=$#freeslot; $i>=0; $i--) {
1074 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1075 push @holdslot, (splice @freeslot, $i, 1);
1078 for (my $i=$#holdslot; $i>=0; $i--) {
1079 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1080 push @freeslot, (splice @holdslot, $i, 1);
1084 # give up if no nodes are succeeding
1085 if ($working_slot_count < 1) {
1086 Log(undef, "Every node has failed -- giving up");
1093 push @freeslot, splice @holdslot;
1094 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1097 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1100 if ($main::please_continue) {
1101 $main::please_continue = 0;
1104 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1106 if (!reapchildren())
1108 check_refresh_wanted();
1110 update_progress_stats();
1111 select (undef, undef, undef, 0.1);
1112 killem (keys %proc) if $main::please_freeze;
1116 update_progress_stats();
1117 freeze_if_want_freeze();
1120 if (!defined $main::success)
1122 if (!@jobstep_todo) {
1124 } elsif ($working_slot_count < 1) {
1125 save_output_collection();
1127 exit_retry_unlocked();
1128 } elsif ($thisround_succeeded == 0 &&
1129 ($thisround_failed == 0 || $thisround_failed > 4)) {
1130 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1131 Log (undef, $message);
1136 goto ONELEVEL if !defined $main::success;
1139 release_allocation();
1141 my $collated_output = save_output_collection();
1142 Log (undef, "finish");
1144 my $final_log = save_meta();
1147 if ($collated_output && $final_log && $main::success) {
1148 $final_state = 'Complete';
1150 $final_state = 'Failed';
1152 $Job->update_attributes('state' => $final_state);
1154 exit (($final_state eq 'Complete') ? 0 : 1);
1158 sub update_progress_stats
1160 $progress_stats_updated = time;
1161 return if !$progress_is_dirty;
1162 my ($todo, $done, $running) = (scalar @jobstep_todo,
1163 scalar @jobstep_done,
1164 scalar keys(%proc));
1165 $Job->{'tasks_summary'} ||= {};
1166 $Job->{'tasks_summary'}->{'todo'} = $todo;
1167 $Job->{'tasks_summary'}->{'done'} = $done;
1168 $Job->{'tasks_summary'}->{'running'} = $running;
1169 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1170 Log (undef, "status: $done done, $running running, $todo todo");
1171 $progress_is_dirty = 0;
1178 my $children_reaped = 0;
1179 my @successful_task_uuids = ();
1181 while((my $pid = waitpid (-1, WNOHANG)) > 0)
1183 my $childstatus = $?;
1185 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1187 . $slot[$proc{$pid}->{slot}]->{cpu});
1188 my $jobstepidx = $proc{$pid}->{jobstepidx};
1190 readfrompipes_after_exit ($jobstepidx);
1193 my $elapsed = time - $proc{$pid}->{time};
1194 my $Jobstep = $jobstep[$jobstepidx];
1196 my $exitvalue = $childstatus >> 8;
1197 my $exitinfo = "exit ".exit_status_s($childstatus);
1198 $Jobstep->{'arvados_task'}->reload;
1199 my $task_success = $Jobstep->{'arvados_task'}->{success};
1201 Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1203 if (!defined $task_success) {
1204 # task did not indicate one way or the other --> fail
1205 Log($jobstepidx, sprintf(
1206 "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1207 exit_status_s($childstatus)));
1208 $Jobstep->{'arvados_task'}->{success} = 0;
1209 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1216 $temporary_fail ||= $Jobstep->{tempfail};
1217 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1219 ++$thisround_failed;
1220 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1222 # Check for signs of a failed or misconfigured node
1223 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1224 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1225 # Don't count this against jobstep failure thresholds if this
1226 # node is already suspected faulty and srun exited quickly
1227 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1229 Log ($jobstepidx, "blaming failure on suspect node " .
1230 $slot[$proc{$pid}->{slot}]->{node}->{name});
1231 $temporary_fail ||= 1;
1233 ban_node_by_slot($proc{$pid}->{slot});
1236 Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1237 ++$Jobstep->{'failures'},
1238 $temporary_fail ? 'temporary' : 'permanent',
1241 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1242 # Give up on this task, and the whole job
1245 # Put this task back on the todo queue
1246 push @jobstep_todo, $jobstepidx;
1247 $Job->{'tasks_summary'}->{'failed'}++;
1251 push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1252 ++$thisround_succeeded;
1253 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1254 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1255 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1256 push @jobstep_done, $jobstepidx;
1257 Log ($jobstepidx, "success in $elapsed seconds");
1259 $Jobstep->{exitcode} = $childstatus;
1260 $Jobstep->{finishtime} = time;
1261 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1262 retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1263 Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1264 length($Jobstep->{'arvados_task'}->{output}),
1265 $Jobstep->{'arvados_task'}->{output}));
1267 close $reader{$jobstepidx};
1268 delete $reader{$jobstepidx};
1269 delete $slot[$proc{$pid}->{slot}]->{pid};
1270 push @freeslot, $proc{$pid}->{slot};
1273 $progress_is_dirty = 1;
1276 if (scalar(@successful_task_uuids) > 0)
1278 Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1280 my $newtask_list = [];
1281 my $newtask_results;
1283 $newtask_results = api_call(
1285 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1286 'order' => 'qsequence',
1287 'offset' => scalar(@$newtask_list),
1289 push(@$newtask_list, @{$newtask_results->{items}});
1290 } while (@{$newtask_results->{items}});
1291 Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1292 foreach my $arvados_task (@$newtask_list) {
1294 'level' => $arvados_task->{'sequence'},
1296 'arvados_task' => $arvados_task
1298 push @jobstep, $jobstep;
1299 push @jobstep_todo, $#jobstep;
1303 return $children_reaped;
1306 sub check_refresh_wanted
1308 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1310 $stat[9] > $latest_refresh &&
1311 # ...and we have actually locked the job record...
1312 $job_id eq $Job->{'uuid'}) {
1313 $latest_refresh = scalar time;
1314 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1315 for my $attr ('cancelled_at',
1316 'cancelled_by_user_uuid',
1317 'cancelled_by_client_uuid',
1319 $Job->{$attr} = $Job2->{$attr};
1321 if ($Job->{'state'} ne "Running") {
1322 if ($Job->{'state'} eq "Cancelled") {
1323 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1325 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1328 $main::please_freeze = 1;
1335 my $last_squeue_check = $squeue_checked;
1337 # Do not call `squeue` or check the kill list more than once every
1339 return if $last_squeue_check > time - 15;
1340 $squeue_checked = time;
1342 # Look for children from which we haven't received stderr data since
1343 # the last squeue check. If no such children exist, all procs are
1344 # alive and there's no need to even look at squeue.
1346 # As long as the crunchstat poll interval (10s) is shorter than the
1347 # squeue check interval (15s) this should make the squeue check an
1349 my $silent_procs = 0;
1350 for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1352 if (!exists($js->{stderr_at}))
1354 $js->{stderr_at} = 0;
1356 if ($js->{stderr_at} < $last_squeue_check)
1361 return if $silent_procs == 0;
1363 # use killem() on procs whose killtime is reached
1364 while (my ($pid, $procinfo) = each %proc)
1366 my $js = $jobstep[$procinfo->{jobstepidx}];
1367 if (exists $procinfo->{killtime}
1368 && $procinfo->{killtime} <= time
1369 && $js->{stderr_at} < $last_squeue_check)
1372 if ($js->{stderr_at}) {
1373 $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1375 Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1382 # here is an opportunity to check for mysterious problems with local procs
1386 # Get a list of steps still running. Note: squeue(1) says --steps
1387 # selects a format (which we override anyway) and allows us to
1388 # specify which steps we're interested in (which we don't).
1389 # Importantly, it also changes the meaning of %j from "job name" to
1390 # "step name" and (although this isn't mentioned explicitly in the
1391 # docs) switches from "one line per job" mode to "one line per step"
1392 # mode. Without it, we'd just get a list of one job, instead of a
1394 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1397 Log(undef, "warning: squeue exit status $? ($!)");
1402 # which of my jobsteps are running, according to squeue?
1404 for my $jobstepname (@squeue)
1406 $ok{$jobstepname} = 1;
1409 # Check for child procs >60s old and not mentioned by squeue.
1410 while (my ($pid, $procinfo) = each %proc)
1412 if ($procinfo->{time} < time - 60
1413 && $procinfo->{jobstepname}
1414 && !exists $ok{$procinfo->{jobstepname}}
1415 && !exists $procinfo->{killtime})
1417 # According to slurm, this task has ended (successfully or not)
1418 # -- but our srun child hasn't exited. First we must wait (30
1419 # seconds) in case this is just a race between communication
1420 # channels. Then, if our srun child process still hasn't
1421 # terminated, we'll conclude some slurm communication
1422 # error/delay has caused the task to die without notifying srun,
1423 # and we'll kill srun ourselves.
1424 $procinfo->{killtime} = time + 30;
1425 Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1432 # If a node fails in a multi-node "srun" call during job setup, the call
1433 # may hang instead of exiting with a nonzero code. This function checks
1434 # "sinfo" for the health of the nodes that were allocated and ensures that
1435 # they are all still in the "alloc" state. If a node that is allocated to
1436 # this job is not in "alloc" state, then set please_freeze.
1438 # This is only called from srun_sync() for node configuration. If a
1439 # node fails doing actual work, there are other recovery mechanisms.
1441 # Do not call `sinfo` more than once every 15 seconds.
1442 return if $sinfo_checked > time - 15;
1443 $sinfo_checked = time;
1445 # The output format "%t" means output node states.
1446 my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1449 Log(undef, "warning: sinfo exit status $? ($!)");
1456 if ($_ != "alloc" && $_ != "alloc*") {
1457 $main::please_freeze = 1;
1462 sub release_allocation
1466 Log (undef, "release job allocation");
1467 system "scancel $ENV{SLURM_JOB_ID}";
1476 my $sel = IO::Select->new();
1477 foreach my $jobstepidx (keys %reader)
1479 my $fd = $reader{$jobstepidx};
1481 $fd_job{$fd} = $jobstepidx;
1483 if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1484 $sel->add($stdout_fd);
1485 $fd_job{$stdout_fd} = $jobstepidx;
1488 # select on all reader fds with 0.1s timeout
1489 my @ready_fds = $sel->can_read(0.1);
1490 foreach my $fd (@ready_fds)
1493 if (0 < sysread ($fd, $buf, 65536))
1496 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1498 my $jobstepidx = $fd_job{$fd};
1499 if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1500 $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1504 $jobstep[$jobstepidx]->{stderr_at} = time;
1505 $jobstep[$jobstepidx]->{stderr} .= $buf;
1507 # Consume everything up to the last \n
1508 preprocess_stderr ($jobstepidx);
1510 if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1512 # If we get a lot of stderr without a newline, chop off the
1513 # front to avoid letting our buffer grow indefinitely.
1514 substr ($jobstep[$jobstepidx]->{stderr},
1515 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1523 # Consume all full lines of stderr for a jobstep. Everything after the
1524 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1526 sub preprocess_stderr
1528 my $jobstepidx = shift;
1529 # slotindex is only defined for children running Arvados job tasks.
1530 # Be prepared to handle the undef case (for setup srun calls, etc.).
1531 my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1533 while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1535 substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1536 Log ($jobstepidx, "stderr $line");
1537 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1538 # If the allocation is revoked, we can't possibly continue, so mark all
1539 # nodes as failed. This will cause the overall exit code to be
1540 # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1542 $main::please_freeze = 1;
1543 foreach my $st (@slot) {
1544 $st->{node}->{fail_count}++;
1547 elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1548 $jobstep[$jobstepidx]->{tempfail} = 1;
1549 if (defined($job_slot_index)) {
1550 $slot[$job_slot_index]->{node}->{fail_count}++;
1551 ban_node_by_slot($job_slot_index);
1554 elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1555 $jobstep[$jobstepidx]->{tempfail} = 1;
1556 ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1558 elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1559 $jobstep[$jobstepidx]->{tempfail} = 1;
1565 # Read whatever is still available on its stderr+stdout pipes after
1566 # the given child process has exited.
1567 sub readfrompipes_after_exit
1569 my $jobstepidx = shift;
1571 # The fact that the child has exited allows some convenient
1572 # simplifications: (1) all data must have already been written, so
1573 # there's no need to wait for more once sysread returns 0; (2) the
1574 # total amount of data available is bounded by the pipe buffer size,
1575 # so it's safe to read everything into one string.
1577 while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
1578 $jobstep[$jobstepidx]->{stderr_at} = time;
1579 $jobstep[$jobstepidx]->{stderr} .= $buf;
1581 if ($jobstep[$jobstepidx]->{stdout_r}) {
1582 while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
1583 $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1586 preprocess_stderr ($jobstepidx);
1589 Log ($jobstepidx, "stderr $_");
1590 } split ("\n", $jobstep[$jobstepidx]->{stderr});
1591 $jobstep[$jobstepidx]->{stderr} = '';
1598 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1599 Log(undef, "fetch_block run error from arv-get $hash: $!");
1602 my $output_block = "";
1605 my $bytes = sysread($keep, $buf, 1024 * 1024);
1606 if (!defined $bytes) {
1607 Log(undef, "fetch_block read error from arv-get: $!");
1608 $output_block = undef;
1610 } elsif ($bytes == 0) {
1611 # sysread returns 0 at the end of the pipe.
1614 # some bytes were read into buf.
1615 $output_block .= $buf;
1620 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1621 $output_block = undef;
1623 return $output_block;
1626 # Create a collection by concatenating the output of all tasks (each
1627 # task's output is either a manifest fragment, a locator for a
1628 # manifest fragment stored in Keep, or nothing at all). Return the
1629 # portable_data_hash of the new collection.
1630 sub create_output_collection
1632 Log (undef, "collate");
1634 my ($child_out, $child_in);
1635 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1638 print (arvados.api("v1").collections().
1639 create(body={"manifest_text": sys.stdin.read(),
1640 "owner_uuid": sys.argv[2]}).
1641 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1642 }, retry_count(), $Job->{owner_uuid});
1645 my $manifest_size = 0;
1649 my $output = $_->{'arvados_task'}->{output};
1650 next if (!defined($output));
1652 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1653 $next_write = fetch_block($output);
1655 $next_write = $output;
1657 if (defined($next_write)) {
1658 if (!defined(syswrite($child_in, $next_write))) {
1659 # There's been an error writing. Stop the loop.
1660 # We'll log details about the exit code later.
1663 $manifest_size += length($next_write);
1666 my $uuid = $_->{'arvados_task'}->{'uuid'};
1667 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1672 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1675 my $s = IO::Select->new($child_out);
1676 if ($s->can_read(120)) {
1677 sysread($child_out, $joboutput, 1024 * 1024);
1680 Log(undef, "output collection creation exited " . exit_status_s($?));
1686 Log (undef, "timed out while creating output collection");
1687 foreach my $signal (2, 2, 2, 15, 15, 9) {
1688 kill($signal, $pid);
1689 last if waitpid($pid, WNOHANG) == -1;
1698 # Calls create_output_collection, logs the result, and returns it.
1699 # If that was successful, save that as the output in the job record.
1700 sub save_output_collection {
1701 my $collated_output = create_output_collection();
1703 if (!$collated_output) {
1704 Log(undef, "Failed to write output collection");
1707 Log(undef, "job output $collated_output");
1708 $Job->update_attributes('output' => $collated_output);
1710 return $collated_output;
1717 my $sig = 2; # SIGINT first
1718 if (exists $proc{$_}->{"sent_$sig"} &&
1719 time - $proc{$_}->{"sent_$sig"} > 4)
1721 $sig = 15; # SIGTERM if SIGINT doesn't work
1723 if (exists $proc{$_}->{"sent_$sig"} &&
1724 time - $proc{$_}->{"sent_$sig"} > 4)
1726 $sig = 9; # SIGKILL if SIGTERM doesn't work
1728 if (!exists $proc{$_}->{"sent_$sig"})
1730 Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1732 select (undef, undef, undef, 0.1);
1735 kill $sig, $_; # srun wants two SIGINT to really interrupt
1737 $proc{$_}->{"sent_$sig"} = time;
1738 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1748 vec($bits,fileno($_),1) = 1;
1754 # Send log output to Keep via arv-put.
1756 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1757 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1758 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1759 # $log_pipe_pid is the pid of the arv-put subprocess.
1761 # The only functions that should access these variables directly are:
1763 # log_writer_start($logfilename)
1764 # Starts an arv-put pipe, reading data on stdin and writing it to
1765 # a $logfilename file in an output collection.
1767 # log_writer_read_output([$timeout])
1768 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1769 # Passes $timeout to the select() call, with a default of 0.01.
1770 # Returns the result of the last read() call on $log_pipe_out, or
1771 # -1 if read() wasn't called because select() timed out.
1772 # Only other log_writer_* functions should need to call this.
1774 # log_writer_send($txt)
1775 # Writes $txt to the output log collection.
1777 # log_writer_finish()
1778 # Closes the arv-put pipe and returns the output that it produces.
1780 # log_writer_is_active()
1781 # Returns a true value if there is currently a live arv-put
1782 # process, false otherwise.
1784 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1787 sub log_writer_start($)
1789 my $logfilename = shift;
1790 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1794 '--filename', $logfilename,
1796 $log_pipe_out_buf = "";
1797 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1800 sub log_writer_read_output {
1801 my $timeout = shift || 0.01;
1803 while ($read && $log_pipe_out_select->can_read($timeout)) {
1804 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1805 length($log_pipe_out_buf));
1807 if (!defined($read)) {
1808 Log(undef, "error reading log manifest from arv-put: $!");
1813 sub log_writer_send($)
1816 print $log_pipe_in $txt;
1817 log_writer_read_output();
1820 sub log_writer_finish()
1822 return unless $log_pipe_pid;
1824 close($log_pipe_in);
1826 my $logger_failed = 0;
1827 my $read_result = log_writer_read_output(600);
1828 if ($read_result == -1) {
1829 $logger_failed = -1;
1830 Log (undef, "timed out reading from 'arv-put'");
1831 } elsif ($read_result != 0) {
1832 $logger_failed = -2;
1833 Log(undef, "failed to read arv-put log manifest to EOF");
1836 waitpid($log_pipe_pid, 0);
1838 $logger_failed ||= $?;
1839 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1842 close($log_pipe_out);
1843 my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1844 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1845 $log_pipe_out_select = undef;
1847 return $arv_put_output;
1850 sub log_writer_is_active() {
1851 return $log_pipe_pid;
1854 sub Log # ($jobstepidx, $logmessage)
1856 my ($jobstepidx, $logmessage) = @_;
1857 if ($logmessage =~ /\n/) {
1858 for my $line (split (/\n/, $_[1])) {
1859 Log ($jobstepidx, $line);
1863 my $fh = select STDERR; $|=1; select $fh;
1865 if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1866 $task_qseq = $jobstepidx;
1868 my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1869 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1872 if (log_writer_is_active() || -t STDERR) {
1873 my @gmtime = gmtime;
1874 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1875 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1877 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1879 if (log_writer_is_active()) {
1880 log_writer_send($datetime . " " . $message);
1887 my ($package, $file, $line) = caller;
1888 my $message = "@_ at $file line $line\n";
1889 Log (undef, $message);
1890 release_allocation();
1891 freeze() if @jobstep_todo;
1892 create_output_collection() if @jobstep_todo;
1902 if ($Job->{'state'} eq 'Cancelled') {
1903 $Job->update_attributes('finished_at' => scalar gmtime);
1905 $Job->update_attributes('state' => 'Failed');
1912 my $justcheckpoint = shift; # false if this will be the last meta saved
1913 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1914 return unless log_writer_is_active();
1915 my $log_manifest = log_writer_finish();
1916 return unless defined($log_manifest);
1919 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1920 $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1923 my $log_coll = api_call(
1924 "collections/create", ensure_unique_name => 1, collection => {
1925 manifest_text => $log_manifest,
1926 owner_uuid => $Job->{owner_uuid},
1927 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1929 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1930 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1932 return $log_coll->{portable_data_hash};
1936 sub freeze_if_want_freeze
1938 if ($main::please_freeze)
1940 release_allocation();
1943 # kill some srun procs before freeze+stop
1944 map { $proc{$_} = {} } @_;
1947 killem (keys %proc);
1948 select (undef, undef, undef, 0.1);
1950 while (($died = waitpid (-1, WNOHANG)) > 0)
1952 delete $proc{$died};
1957 create_output_collection();
1967 Log (undef, "Freeze not implemented");
1974 croak ("Thaw not implemented");
1990 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1996 my $srunargs = shift;
1997 my $execargs = shift;
1998 my $opts = shift || {};
2001 my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
2002 Log (undef, "$label: start");
2004 my ($stderr_r, $stderr_w);
2005 pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
2007 my ($stdout_r, $stdout_w);
2008 pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
2010 my $srunpid = fork();
2015 fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
2016 fcntl($stdout_w, F_SETFL, 0) or croak($!);
2017 open(STDERR, ">&", $stderr_w) or croak ($!);
2018 open(STDOUT, ">&", $stdout_w) or croak ($!);
2019 srun ($srunargs, $execargs, $opts, $stdin);
2025 set_nonblocking($stderr_r);
2026 set_nonblocking($stdout_r);
2028 # Add entries to @jobstep and %proc so check_squeue() and
2029 # freeze_if_want_freeze() can treat it like a job task process.
2033 stderr_captured => '',
2034 stdout_r => $stdout_r,
2035 stdout_captured => '',
2037 my $jobstepidx = $#jobstep;
2039 jobstepidx => $jobstepidx,
2041 $reader{$jobstepidx} = $stderr_r;
2043 while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2044 my $busy = readfrompipes();
2045 if (!$busy || ($latest_refresh + 2 < scalar time)) {
2046 check_refresh_wanted();
2051 select(undef, undef, undef, 0.1);
2053 killem(keys %proc) if $main::please_freeze;
2057 readfrompipes_after_exit ($jobstepidx);
2059 Log (undef, "$label: exit ".exit_status_s($exited));
2063 delete $proc{$srunpid};
2064 delete $reader{$jobstepidx};
2066 my $j = pop @jobstep;
2067 # If the srun showed signs of tempfail, ensure the caller treats that as a
2069 if ($main::please_freeze || $j->{tempfail}) {
2072 return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2078 my $srunargs = shift;
2079 my $execargs = shift;
2080 my $opts = shift || {};
2082 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2084 $Data::Dumper::Terse = 1;
2085 $Data::Dumper::Indent = 0;
2086 my $show_cmd = Dumper($args);
2087 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2088 $show_cmd =~ s/\n/ /g;
2089 if ($opts->{fork}) {
2090 Log(undef, "starting: $show_cmd");
2092 # This is a child process: parent is in charge of reading our
2093 # stderr and copying it to Log() if needed.
2094 warn "starting: $show_cmd\n";
2097 if (defined $stdin) {
2098 my $child = open STDIN, "-|";
2099 defined $child or die "no fork: $!";
2101 print $stdin or die $!;
2102 close STDOUT or die $!;
2107 return system (@$args) if $opts->{fork};
2110 warn "ENV size is ".length(join(" ",%ENV));
2111 die "exec failed: $!: @$args";
2115 sub ban_node_by_slot {
2116 # Don't start any new jobsteps on this node for 60 seconds
2118 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2119 $slot[$slotid]->{node}->{hold_count}++;
2120 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2125 my ($lockfile, $error_message) = @_;
2126 open L, ">", $lockfile or croak("$lockfile: $!");
2127 if (!flock L, LOCK_EX|LOCK_NB) {
2128 croak("Can't lock $lockfile: $error_message\n");
2132 sub find_docker_image {
2133 # Given a Keep locator, check to see if it contains a Docker image.
2134 # If so, return its stream name and Docker hash.
2135 # If not, return undef for both values.
2136 my $locator = shift;
2137 my ($streamname, $filename);
2138 my $image = api_call("collections/get", uuid => $locator);
2140 foreach my $line (split(/\n/, $image->{manifest_text})) {
2141 my @tokens = split(/\s+/, $line);
2143 $streamname = shift(@tokens);
2144 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2145 if (defined($filename)) {
2146 return (undef, undef); # More than one file in the Collection.
2148 $filename = (split(/:/, $filedata, 3))[2];
2153 if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2154 return ($streamname, $1);
2156 return (undef, undef);
2160 sub exit_retry_unlocked {
2161 Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2162 exit(EX_RETRY_UNLOCKED);
2166 # Calculate the number of times an operation should be retried,
2167 # assuming exponential backoff, and that we're willing to retry as
2168 # long as tasks have been running. Enforce a minimum of 3 retries.
2169 my ($starttime, $endtime, $timediff, $retries);
2171 $starttime = $jobstep[0]->{starttime};
2172 $endtime = $jobstep[-1]->{finishtime};
2174 if (!defined($starttime)) {
2176 } elsif (!defined($endtime)) {
2177 $timediff = time - $starttime;
2179 $timediff = ($endtime - $starttime) - (time - $endtime);
2181 if ($timediff > 0) {
2182 $retries = int(log($timediff) / log(2));
2184 $retries = 1; # Use the minimum.
2186 return ($retries > 3) ? $retries : 3;
2190 # Pass in two function references.
2191 # This method will be called with the remaining arguments.
2192 # If it dies, retry it with exponential backoff until it succeeds,
2193 # or until the current retry_count is exhausted. After each failure
2194 # that can be retried, the second function will be called with
2195 # the current try count (0-based), next try time, and error message.
2196 my $operation = shift;
2197 my $op_text = shift;
2198 my $retries = retry_count();
2199 my $retry_callback = sub {
2200 my ($try_count, $next_try_at, $errmsg) = @_;
2201 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2202 $errmsg =~ s/\s/ /g;
2203 $errmsg =~ s/\s+$//;
2205 if ($next_try_at < time) {
2206 $retry_msg = "Retrying.";
2208 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2209 $retry_msg = "Retrying at $next_try_fmt.";
2211 Log(undef, "$op_text failed: $errmsg. $retry_msg");
2213 foreach my $try_count (0..$retries) {
2214 my $next_try = time + (2 ** $try_count);
2215 my $result = eval { $operation->(@_); };
2218 } elsif ($try_count < $retries) {
2219 $retry_callback->($try_count, $next_try, $@);
2220 my $sleep_time = $next_try - time;
2221 sleep($sleep_time) if ($sleep_time > 0);
2224 # Ensure the error message ends in a newline, so Perl doesn't add
2225 # retry_op's line number to it.
2231 # Pass in a /-separated API method name, and arguments for it.
2232 # This function will call that method, retrying as needed until
2233 # the current retry_count is exhausted, with a log on the first failure.
2234 my $method_name = shift;
2236 foreach my $key (split(/\//, $method_name)) {
2237 $method = $method->{$key};
2239 return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2243 # Given a $?, return a human-readable exit code string like "0" or
2244 # "1" or "0 with signal 1" or "1 with signal 11".
2245 my $exitcode = shift;
2246 my $s = $exitcode >> 8;
2247 if ($exitcode & 0x7f) {
2248 $s .= " with signal " . ($exitcode & 0x7f);
2250 if ($exitcode & 0x80) {
2251 $s .= " with core dump";
2256 sub handle_readall {
2257 # Pass in a glob reference to a file handle.
2258 # Read all its contents and return them as a string.
2259 my $fh_glob_ref = shift;
2261 return <$fh_glob_ref>;
2264 sub tar_filename_n {
2266 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2269 sub add_git_archive {
2270 # Pass in a git archive command as a string or list, a la system().
2271 # This method will save its output to be included in the archive sent to the
2275 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2276 croak("Failed to save git archive: $!");
2278 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2280 waitpid($git_pid, 0);
2283 croak("Failed to save git archive: git exited " . exit_status_s($?));
2287 sub combined_git_archive {
2288 # Combine all saved tar archives into a single archive, then return its
2289 # contents in a string. Return undef if no archives have been saved.
2290 if ($git_tar_count < 1) {
2293 my $base_tar_name = tar_filename_n(1);
2294 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2295 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2296 if ($tar_exit != 0) {
2297 croak("Error preparing build archive: tar -A exited " .
2298 exit_status_s($tar_exit));
2301 if (!open(GIT_TAR, "<", $base_tar_name)) {
2302 croak("Could not open build archive: $!");
2304 my $tar_contents = handle_readall(\*GIT_TAR);
2306 return $tar_contents;
2309 sub set_nonblocking {
2311 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2312 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2318 # This is crunch-job's internal dispatch script. crunch-job running on the API
2319 # server invokes this script on individual compute nodes, or localhost if we're
2320 # running a job locally. It gets called in two modes:
2322 # * No arguments: Installation mode. Read a tar archive from the DATA
2323 # file handle; it includes the Crunch script's source code, and
2324 # maybe SDKs as well. Those should be installed in the proper
2325 # locations. This runs outside of any Docker container, so don't try to
2326 # introspect Crunch's runtime environment.
2328 # * With arguments: Crunch script run mode. This script should set up the
2329 # environment, then run the command specified in the arguments. This runs
2330 # inside any Docker container.
2333 use File::Path qw( make_path remove_tree );
2334 use POSIX qw(getcwd);
2336 use constant TASK_TEMPFAIL => 111;
2338 # Map SDK subdirectories to the path environments they belong to.
2339 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2341 my $destdir = $ENV{"CRUNCH_SRC"};
2342 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2343 my $repo = $ENV{"CRUNCH_SRC_URL"};
2344 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2345 my $job_work = $ENV{"JOB_WORK"};
2346 my $task_work = $ENV{"TASK_WORK"};
2348 open(STDOUT_ORIG, ">&", STDOUT);
2349 open(STDERR_ORIG, ">&", STDERR);
2351 for my $dir ($destdir, $job_work, $task_work) {
2354 -e $dir or die "Failed to create temporary directory ($dir): $!";
2359 remove_tree($task_work, {keep_root => 1});
2362 ### Crunch script run mode
2364 # We want to do routine logging during task 0 only. This gives the user
2365 # the information they need, but avoids repeating the information for every
2368 if ($ENV{TASK_SEQUENCE} eq "0") {
2371 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2377 my $python_src = "$install_dir/python";
2378 my $venv_dir = "$job_work/.arvados.venv";
2379 my $venv_built = -e "$venv_dir/bin/activate";
2380 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2381 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2382 "--python=python2.7", $venv_dir);
2383 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2385 $Log->("Built Python SDK virtualenv");
2388 my @pysdk_version_cmd = ("python", "-c",
2389 "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2391 $Log->("Running in Python SDK virtualenv");
2392 @pysdk_version_cmd = ();
2393 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2394 @ARGV = ("/bin/sh", "-ec",
2395 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2396 } elsif (-d $python_src) {
2397 $Log->("Warning: virtualenv not found inside Docker container default " .
2398 "\$PATH. Can't install Python SDK.");
2401 if (@pysdk_version_cmd) {
2402 open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2403 my $pysdk_version = <$pysdk_version_pipe>;
2404 close($pysdk_version_pipe);
2406 chomp($pysdk_version);
2407 $Log->("Using Arvados SDK version $pysdk_version");
2409 # A lot could've gone wrong here, but pretty much all of it means that
2410 # Python won't be able to load the Arvados SDK.
2411 $Log->("Warning: Arvados SDK not found");
2415 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2416 my $sdk_path = "$install_dir/$sdk_dir";
2418 if ($ENV{$sdk_envkey}) {
2419 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2421 $ENV{$sdk_envkey} = $sdk_path;
2423 $Log->("Arvados SDK added to %s", $sdk_envkey);
2428 die "Cannot exec `@ARGV`: $!";
2431 ### Installation mode
2432 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2434 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2435 # This exact git archive (source + arvados sdk) is already installed
2436 # here, so there's no need to reinstall it.
2438 # We must consume our DATA section, though: otherwise the process
2439 # feeding it to us will get SIGPIPE.
2441 while (read(DATA, $buf, 65536)) { }
2446 unlink "$destdir.archive_hash";
2450 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2451 local $SIG{PIPE} = "IGNORE";
2452 warn "Extracting archive: $archive_hash\n";
2453 # --ignore-zeros is necessary sometimes: depending on how much NUL
2454 # padding tar -A put on our combined archive (which in turn depends
2455 # on the length of the component archives) tar without
2456 # --ignore-zeros will exit before consuming stdin and cause close()
2457 # to fail on the resulting SIGPIPE.
2458 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2459 die "Error launching 'tar -xC $destdir': $!";
2461 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2462 # get SIGPIPE. We must feed it data incrementally.
2464 while (read(DATA, $tar_input, 65536)) {
2465 print TARX $tar_input;
2468 die "'tar -xC $destdir' exited $?: $!";
2474 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2476 foreach my $sdk_lang (("python",
2477 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2478 if (-d "$sdk_root/$sdk_lang") {
2479 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2480 die "Failed to install $sdk_lang SDK: $!";
2486 my $python_dir = "$install_dir/python";
2487 if ((-d $python_dir) and can_run("python2.7")) {
2488 open(my $egg_info_pipe, "-|",
2489 "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2490 my @egg_info_errors = <$egg_info_pipe>;
2491 close($egg_info_pipe);
2494 if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2495 # egg_info apparently failed because it couldn't ask git for a build tag.
2496 # Specify no build tag.
2497 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2498 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2501 my $egg_info_exit = $? >> 8;
2502 foreach my $errline (@egg_info_errors) {
2505 warn "python setup.py egg_info failed: exit $egg_info_exit";
2506 exit ($egg_info_exit || 1);
2511 # Hide messages from the install script (unless it fails: shell_or_die
2512 # will show $destdir.log in that case).
2513 open(STDOUT, ">>", "$destdir.log") or die ($!);
2514 open(STDERR, ">&", STDOUT) or die ($!);
2516 if (-e "$destdir/crunch_scripts/install") {
2517 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2518 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2520 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2521 } elsif (-e "./install.sh") {
2522 shell_or_die (undef, "./install.sh", $install_dir);
2525 if ($archive_hash) {
2526 unlink "$destdir.archive_hash.new";
2527 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2528 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2534 my $command_name = shift;
2535 open(my $which, "-|", "which", $command_name) or die ($!);
2536 while (<$which>) { }
2543 my $exitcode = shift;
2545 if ($ENV{"DEBUG"}) {
2546 print STDERR "@_\n";
2548 if (system (@_) != 0) {
2551 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2552 open STDERR, ">&STDERR_ORIG";
2553 system ("cat $destdir.log >&2");
2554 warn "@_ failed ($err): $exitstatus";
2555 if (defined($exitcode)) {
2559 exit (($code >> 8) || 1);