2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106 if ($ENV{"USER"} ne "crunch" && $< != 0) {
107 # use a tmp dir unique for my uid
108 $ENV{"CRUNCH_TMP"} .= "-$<";
112 $ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
113 unless (defined($ENV{"HOST_CERTS"})) {
114 if (-f "/etc/ssl/certs/ca-certificates.crt") {
115 $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
116 } elsif (-f "/etc/pki/tls/certs/ca-bundle.crt") {
117 $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
121 # Create the tmp directory if it does not exist
122 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
123 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
126 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
127 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
128 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
129 mkdir ($ENV{"JOB_WORK"});
138 my $cgroup_root = "/sys/fs/cgroup";
139 my $docker_bin = "docker.io";
140 my $docker_run_args = "";
141 GetOptions('force-unlock' => \$force_unlock,
142 'git-dir=s' => \$git_dir,
143 'job=s' => \$jobspec,
144 'job-api-token=s' => \$job_api_token,
145 'no-clear-tmp' => \$no_clear_tmp,
146 'resume-stash=s' => \$resume_stash,
147 'cgroup-root=s' => \$cgroup_root,
148 'docker-bin=s' => \$docker_bin,
149 'docker-run-args=s' => \$docker_run_args,
152 if (defined $job_api_token) {
153 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
156 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
161 $main::ENV{CRUNCH_DEBUG} = 1;
165 $main::ENV{CRUNCH_DEBUG} = 0;
168 my $arv = Arvados->new('apiVersion' => 'v1');
177 if ($jobspec =~ /^[-a-z\d]+$/)
179 # $jobspec is an Arvados UUID, not a JSON job specification
180 $Job = api_call("jobs/get", uuid => $jobspec);
185 $local_job = JSON::decode_json($jobspec);
189 # Make sure our workers (our slurm nodes, localhost, or whatever) are
190 # at least able to run basic commands: they aren't down or severely
193 if (($Job || $local_job)->{docker_image_locator}) {
194 $cmd = [$docker_bin, 'ps', '-q'];
196 Log(undef, "Sanity check is `@$cmd`");
197 my ($exited, $stdout, $stderr) = srun_sync(
198 ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
200 {label => "sanity check"});
202 Log(undef, "Sanity check failed: ".exit_status_s($exited));
205 Log(undef, "Sanity check OK");
208 my $User = api_call("users/current");
211 if (!$force_unlock) {
212 # Claim this job, and make sure nobody else does
213 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
215 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
224 map { croak ("No $_ specified") unless $local_job->{$_} }
225 qw(script script_version script_parameters);
228 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
229 $local_job->{'started_at'} = gmtime;
230 $local_job->{'state'} = 'Running';
232 $Job = api_call("jobs/create", job => $local_job);
234 $job_id = $Job->{'uuid'};
236 my $keep_logfile = $job_id . '.log.txt';
237 log_writer_start($keep_logfile);
239 $Job->{'runtime_constraints'} ||= {};
240 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
241 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
243 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
245 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
246 chomp($gem_versions);
247 chop($gem_versions); # Closing parentheses
252 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
254 Log (undef, "check slurm allocation");
257 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
261 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
262 push @sinfo, "$localcpus localhost";
264 if (exists $ENV{SLURM_NODELIST})
266 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
270 my ($ncpus, $slurm_nodelist) = split;
271 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
274 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
277 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
280 foreach (split (",", $ranges))
293 push @nodelist, map {
295 $n =~ s/\[[-,\d]+\]/$_/;
302 push @nodelist, $nodelist;
305 foreach my $nodename (@nodelist)
307 Log (undef, "node $nodename - $ncpus slots");
308 my $node = { name => $nodename,
310 # The number of consecutive times a task has been dispatched
311 # to this node and failed.
313 # The number of consecutive times that SLURM has reported
314 # a node failure since the last successful task.
316 # Don't dispatch work to this node until this time
317 # (in seconds since the epoch) has passed.
319 foreach my $cpu (1..$ncpus)
321 push @slot, { node => $node,
325 push @node, @nodelist;
330 # Ensure that we get one jobstep running on each allocated node before
331 # we start overloading nodes with concurrent steps
333 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
336 $Job->update_attributes(
337 'tasks_summary' => { 'failed' => 0,
342 Log (undef, "start");
343 $SIG{'INT'} = sub { $main::please_freeze = 1; };
344 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
345 $SIG{'TERM'} = \&croak;
346 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
347 $SIG{'ALRM'} = sub { $main::please_info = 1; };
348 $SIG{'CONT'} = sub { $main::please_continue = 1; };
349 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
351 $main::please_freeze = 0;
352 $main::please_info = 0;
353 $main::please_continue = 0;
354 $main::please_refresh = 0;
355 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
357 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
358 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
359 $ENV{"JOB_UUID"} = $job_id;
362 my @jobstep_todo = ();
363 my @jobstep_done = ();
364 my @jobstep_tomerge = ();
365 my $jobstep_tomerge_level = 0;
366 my $squeue_checked = 0;
367 my $latest_refresh = scalar time;
371 if (defined $Job->{thawedfromkey})
373 thaw ($Job->{thawedfromkey});
377 my $first_task = api_call("job_tasks/create", job_task => {
378 'job_uuid' => $Job->{'uuid'},
383 push @jobstep, { 'level' => 0,
385 'arvados_task' => $first_task,
387 push @jobstep_todo, 0;
393 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
396 my $build_script = handle_readall(\*DATA);
397 my $nodelist = join(",", @node);
398 my $git_tar_count = 0;
400 if (!defined $no_clear_tmp) {
401 # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
402 # up work directories crunch_tmp/work, crunch_tmp/opt,
405 # TODO: When #5036 is done and widely deployed, we can limit mount's
406 # -t option to simply fuse.keep.
407 my ($exited, $stdout, $stderr) = srun_sync(
408 ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
409 ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
410 {label => "clean work dirs"});
412 exit(EX_RETRY_UNLOCKED);
416 # If this job requires a Docker image, install that.
417 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
418 if ($docker_locator = $Job->{docker_image_locator}) {
419 Log (undef, "Install docker image $docker_locator");
420 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
423 croak("No Docker image hash found from locator $docker_locator");
425 Log (undef, "docker image hash is $docker_hash");
426 $docker_stream =~ s/^\.//;
427 my $docker_install_script = qq{
428 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
429 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
433 my ($exited, $stdout, $stderr) = srun_sync(
434 ["srun", "--nodelist=" . join(',', @node)],
435 ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
436 {label => "load docker image"});
439 exit(EX_RETRY_UNLOCKED);
442 # Determine whether this version of Docker supports memory+swap limits.
443 ($exited, $stdout, $stderr) = srun_sync(
444 ["srun", "--nodes=1"],
445 [$docker_bin, 'run', '--help'],
446 {label => "check --memory-swap feature"});
447 $docker_limitmem = ($stdout =~ /--memory-swap/);
449 # Find a non-root Docker user to use.
450 # Tries the default user for the container, then 'crunch', then 'nobody',
451 # testing for whether the actual user id is non-zero. This defends against
452 # mistakes but not malice, but we intend to harden the security in the future
453 # so we don't want anyone getting used to their jobs running as root in their
455 my @tryusers = ("", "crunch", "nobody");
456 foreach my $try_user (@tryusers) {
459 if ($try_user eq "") {
460 $label = "check whether default user is UID 0";
463 $label = "check whether user '$try_user' is UID 0";
464 $try_user_arg = "--user=$try_user";
466 my ($exited, $stdout, $stderr) = srun_sync(
467 ["srun", "--nodes=1"],
469 "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
472 if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
473 $dockeruserarg = $try_user_arg;
474 if ($try_user eq "") {
475 Log(undef, "Container will run with default user");
477 Log(undef, "Container will run with $dockeruserarg");
483 if (!defined $dockeruserarg) {
484 croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
487 if ($Job->{arvados_sdk_version}) {
488 # The job also specifies an Arvados SDK version. Add the SDKs to the
489 # tar file for the build script to install.
490 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
491 $Job->{arvados_sdk_version}));
492 add_git_archive("git", "--git-dir=$git_dir", "archive",
493 "--prefix=.arvados.sdk/",
494 $Job->{arvados_sdk_version}, "sdk");
498 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
499 # If script_version looks like an absolute path, *and* the --git-dir
500 # argument was not given -- which implies we were not invoked by
501 # crunch-dispatch -- we will use the given path as a working
502 # directory instead of resolving script_version to a git commit (or
503 # doing anything else with git).
504 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
505 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
508 # Resolve the given script_version to a git commit sha1. Also, if
509 # the repository is remote, clone it into our local filesystem: this
510 # ensures "git archive" will work, and is necessary to reliably
511 # resolve a symbolic script_version like "master^".
512 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
514 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
516 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
518 # If we're running under crunch-dispatch, it will have already
519 # pulled the appropriate source tree into its own repository, and
520 # given us that repo's path as $git_dir.
522 # If we're running a "local" job, we might have to fetch content
523 # from a remote repository.
525 # (Currently crunch-dispatch gives a local path with --git-dir, but
526 # we might as well accept URLs there too in case it changes its
528 my $repo = $git_dir || $Job->{'repository'};
530 # Repository can be remote or local. If remote, we'll need to fetch it
531 # to a local dir before doing `git log` et al.
534 if ($repo =~ m{://|^[^/]*:}) {
535 # $repo is a git url we can clone, like git:// or https:// or
536 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
537 # not recognized here because distinguishing that from a local
538 # path is too fragile. If you really need something strange here,
539 # use the ssh:// form.
540 $repo_location = 'remote';
541 } elsif ($repo =~ m{^\.*/}) {
542 # $repo is a local path to a git index. We'll also resolve ../foo
543 # to ../foo/.git if the latter is a directory. To help
544 # disambiguate local paths from named hosted repositories, this
545 # form must be given as ./ or ../ if it's a relative path.
546 if (-d "$repo/.git") {
547 $repo = "$repo/.git";
549 $repo_location = 'local';
551 # $repo is none of the above. It must be the name of a hosted
553 my $arv_repo_list = api_call("repositories/list",
554 'filters' => [['name','=',$repo]]);
555 my @repos_found = @{$arv_repo_list->{'items'}};
556 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
558 Log(undef, "Repository '$repo' -> "
559 . join(", ", map { $_->{'uuid'} } @repos_found));
562 croak("Error: Found $n_found repositories with name '$repo'.");
564 $repo = $repos_found[0]->{'fetch_url'};
565 $repo_location = 'remote';
567 Log(undef, "Using $repo_location repository '$repo'");
568 $ENV{"CRUNCH_SRC_URL"} = $repo;
570 # Resolve given script_version (we'll call that $treeish here) to a
571 # commit sha1 ($commit).
572 my $treeish = $Job->{'script_version'};
574 if ($repo_location eq 'remote') {
575 # We minimize excess object-fetching by re-using the same bare
576 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
577 # just keep adding remotes to it as needed.
578 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
579 my $gitcmd = "git --git-dir=\Q$local_repo\E";
581 # Set up our local repo for caching remote objects, making
583 if (!-d $local_repo) {
584 make_path($local_repo) or croak("Error: could not create $local_repo");
586 # This works (exits 0 and doesn't delete fetched objects) even
587 # if $local_repo is already initialized:
588 `$gitcmd init --bare`;
590 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
593 # If $treeish looks like a hash (or abbrev hash) we look it up in
594 # our local cache first, since that's cheaper. (We don't want to
595 # do that with tags/branches though -- those change over time, so
596 # they should always be resolved by the remote repo.)
597 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
598 # Hide stderr because it's normal for this to fail:
599 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
601 # Careful not to resolve a branch named abcdeff to commit 1234567:
602 $sha1 =~ /^$treeish/ &&
603 $sha1 =~ /^([0-9a-f]{40})$/s) {
605 Log(undef, "Commit $commit already present in $local_repo");
609 if (!defined $commit) {
610 # If $treeish isn't just a hash or abbrev hash, or isn't here
611 # yet, we need to fetch the remote to resolve it correctly.
613 # First, remove all local heads. This prevents a name that does
614 # not exist on the remote from resolving to (or colliding with)
615 # a previously fetched branch or tag (possibly from a different
617 remove_tree("$local_repo/refs/heads", {keep_root => 1});
619 Log(undef, "Fetching objects from $repo to $local_repo");
620 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
622 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626 # Now that the data is all here, we will use our local repo for
627 # the rest of our git activities.
631 my $gitcmd = "git --git-dir=\Q$repo\E";
632 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
633 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
634 croak("`$gitcmd rev-list` exited "
636 .", '$treeish' not found, giving up");
639 Log(undef, "Version $treeish is commit $commit");
641 if ($commit ne $Job->{'script_version'}) {
642 # Record the real commit id in the database, frozentokey, logs,
643 # etc. -- instead of an abbreviation or a branch name which can
644 # become ambiguous or point to a different commit in the future.
645 if (!$Job->update_attributes('script_version' => $commit)) {
646 croak("Error: failed to update job's script_version attribute");
650 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
651 add_git_archive("$gitcmd archive ''\Q$commit\E");
654 my $git_archive = combined_git_archive();
655 if (!defined $git_archive) {
656 Log(undef, "Skip install phase (no git archive)");
658 Log(undef, "Warning: This probably means workers have no source tree!");
663 my $install_script_tries_left = 3;
664 for (my $attempts = 0; $attempts < 3; $attempts++) {
665 my @srunargs = ("srun",
666 "--nodelist=$nodelist",
667 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
668 my @execargs = ("sh", "-c",
669 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
671 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
672 my ($stdout, $stderr);
673 ($exited, $stdout, $stderr) = srun_sync(
674 \@srunargs, \@execargs,
675 {label => "run install script on all workers"},
676 $build_script . $git_archive);
678 my $stderr_anything_from_script = 0;
679 for my $line (split(/\n/, $stderr)) {
680 if ($line !~ /^(srun: error: |starting: \[)/) {
681 $stderr_anything_from_script = 1;
685 last if $exited == 0 || $main::please_freeze;
687 # If the install script fails but doesn't print an error message,
688 # the next thing anyone is likely to do is just run it again in
689 # case it was a transient problem like "slurm communication fails
690 # because the network isn't reliable enough". So we'll just do
691 # that ourselves (up to 3 attempts in total). OTOH, if there is an
692 # error message, the problem is more likely to have a real fix and
693 # we should fail the job so the fixing process can start, instead
694 # of doing 2 more attempts.
695 last if $stderr_anything_from_script;
698 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
699 unlink($tar_filename);
707 foreach (qw (script script_version script_parameters runtime_constraints))
711 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
713 foreach (split (/\n/, $Job->{knobs}))
715 Log (undef, "knob " . $_);
720 $main::success = undef;
726 my $thisround_succeeded = 0;
727 my $thisround_failed = 0;
728 my $thisround_failed_multiple = 0;
729 my $working_slot_count = scalar(@slot);
731 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
732 or $a <=> $b } @jobstep_todo;
733 my $level = $jobstep[$jobstep_todo[0]]->{level};
735 my $initial_tasks_this_level = 0;
736 foreach my $id (@jobstep_todo) {
737 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
740 # If the number of tasks scheduled at this level #T is smaller than the number
741 # of slots available #S, only use the first #T slots, or the first slot on
742 # each node, whichever number is greater.
744 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
745 # based on these numbers. Using fewer slots makes more resources available
746 # to each individual task, which should normally be a better strategy when
747 # there are fewer of them running with less parallelism.
749 # Note that this calculation is not redone if the initial tasks at
750 # this level queue more tasks at the same level. This may harm
751 # overall task throughput for that level.
753 if ($initial_tasks_this_level < @node) {
754 @freeslot = (0..$#node);
755 } elsif ($initial_tasks_this_level < @slot) {
756 @freeslot = (0..$initial_tasks_this_level - 1);
758 @freeslot = (0..$#slot);
760 my $round_num_freeslots = scalar(@freeslot);
761 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
763 my %round_max_slots = ();
764 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
765 my $this_slot = $slot[$freeslot[$ii]];
766 my $node_name = $this_slot->{node}->{name};
767 $round_max_slots{$node_name} ||= $this_slot->{cpu};
768 last if (scalar(keys(%round_max_slots)) >= @node);
771 Log(undef, "start level $level with $round_num_freeslots slots");
774 my $progress_is_dirty = 1;
775 my $progress_stats_updated = 0;
777 update_progress_stats();
781 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
783 # Don't create new tasks if we already know the job's final result.
784 last if defined($main::success);
786 my $id = $jobstep_todo[$todo_ptr];
787 my $Jobstep = $jobstep[$id];
788 if ($Jobstep->{level} != $level)
793 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
794 set_nonblocking($reader{$id});
796 my $childslot = $freeslot[0];
797 my $childnode = $slot[$childslot]->{node};
798 my $childslotname = join (".",
799 $slot[$childslot]->{node}->{name},
800 $slot[$childslot]->{cpu});
802 my $childpid = fork();
805 $SIG{'INT'} = 'DEFAULT';
806 $SIG{'QUIT'} = 'DEFAULT';
807 $SIG{'TERM'} = 'DEFAULT';
809 foreach (values (%reader))
813 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
814 open(STDOUT,">&writer");
815 open(STDERR,">&writer");
820 delete $ENV{"GNUPGHOME"};
821 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
822 $ENV{"TASK_QSEQUENCE"} = $id;
823 $ENV{"TASK_SEQUENCE"} = $level;
824 $ENV{"JOB_SCRIPT"} = $Job->{script};
825 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
826 $param =~ tr/a-z/A-Z/;
827 $ENV{"JOB_PARAMETER_$param"} = $value;
829 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
830 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
831 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
832 $ENV{"HOME"} = $ENV{"TASK_WORK"};
833 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
834 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
835 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
837 my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
843 "--nodelist=".$childnode->{name},
844 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
845 "--job-name=$job_id.$id.$$",
848 my $stdbuf = " stdbuf --output=0 --error=0 ";
850 my $arv_file_cache = "";
851 if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
852 $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
856 "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
857 ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
858 ."&& cd \Q$ENV{CRUNCH_TMP}\E "
859 # These environment variables get used explicitly later in
860 # $command. No tool is expected to read these values directly.
861 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
862 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
863 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
864 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
866 $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
867 $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
868 $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
872 my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
873 my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
874 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
875 $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
876 # We only set memory limits if Docker lets us limit both memory and swap.
877 # Memory limits alone have been supported longer, but subprocesses tend
878 # to get SIGKILL if they exceed that without any swap limit set.
879 # See #5642 for additional background.
880 if ($docker_limitmem) {
881 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
884 # The source tree and $destdir directory (which we have
885 # installed on the worker host) are available in the container,
886 # under the same path.
887 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
888 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
890 # Currently, we make the "by_pdh" directory in arv-mount's mount
891 # point appear at /keep inside the container (instead of using
892 # the same path as the host like we do with CRUNCH_SRC and
893 # CRUNCH_INSTALL). However, crunch scripts and utilities must
894 # not rely on this. They must use $TASK_KEEPMOUNT.
895 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
896 $ENV{TASK_KEEPMOUNT} = "/keep";
898 # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
899 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
900 $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
902 # TASK_WORK is almost exactly like a docker data volume: it
903 # starts out empty, is writable, and persists until no
904 # containers use it any more. We don't use --volumes-from to
905 # share it with other containers: it is only accessible to this
906 # task, and it goes away when this task stops.
908 # However, a docker data volume is writable only by root unless
909 # the mount point already happens to exist in the container with
910 # different permissions. Therefore, we [1] assume /tmp already
911 # exists in the image and is writable by the crunch user; [2]
912 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
913 # writable if they are created by docker while setting up the
914 # other --volumes); and [3] create $TASK_WORK inside the
915 # container using $build_script.
916 $command .= "--volume=/tmp ";
917 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
918 $ENV{"HOME"} = $ENV{"TASK_WORK"};
919 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
921 # TODO: Share a single JOB_WORK volume across all task
922 # containers on a given worker node, and delete it when the job
923 # ends (and, in case that doesn't work, when the next job
926 # For now, use the same approach as TASK_WORK above.
927 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
929 # Bind mount the crunchrunner binary and host TLS certificates file into
931 $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
932 $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
934 while (my ($env_key, $env_val) = each %ENV)
936 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
937 $command .= "--env=\Q$env_key=$env_val\E ";
940 $command .= "--env=\QHOME=$ENV{HOME}\E ";
941 $command .= "\Q$docker_hash\E ";
943 if ($Job->{arvados_sdk_version}) {
945 $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
947 $command .= "/bin/sh -c \'python -c " .
948 '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
949 ">&2 2>/dev/null; " .
950 "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
951 "if which stdbuf >/dev/null ; then " .
952 " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
954 " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
959 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
961 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
964 my @execargs = ('bash', '-c', $command);
965 srun (\@srunargs, \@execargs, undef, $build_script);
966 # exec() failed, we assume nothing happened.
967 die "srun() failed on build script\n";
970 if (!defined $childpid)
981 jobstepname => "$job_id.$id.$childpid",
983 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
984 $slot[$childslot]->{pid} = $childpid;
986 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
987 Log ($id, "child $childpid started on $childslotname");
988 $Jobstep->{starttime} = time;
989 $Jobstep->{node} = $childnode->{name};
990 $Jobstep->{slotindex} = $childslot;
991 delete $Jobstep->{stderr};
992 delete $Jobstep->{finishtime};
993 delete $Jobstep->{tempfail};
995 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
996 $Jobstep->{'arvados_task'}->save;
998 splice @jobstep_todo, $todo_ptr, 1;
1001 $progress_is_dirty = 1;
1005 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1007 last THISROUND if $main::please_freeze;
1008 if ($main::please_info)
1010 $main::please_info = 0;
1012 create_output_collection();
1014 update_progress_stats();
1019 if (!$gotsome || ($latest_refresh + 2 < scalar time))
1021 check_refresh_wanted();
1023 update_progress_stats();
1025 elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1027 update_progress_stats();
1030 select (undef, undef, undef, 0.1);
1032 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1033 $_->{node}->{hold_count} < 4 } @slot);
1034 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1035 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1037 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1038 .($thisround_failed+$thisround_succeeded)
1039 .") -- giving up on this round";
1040 Log (undef, $message);
1044 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1045 for (my $i=$#freeslot; $i>=0; $i--) {
1046 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1047 push @holdslot, (splice @freeslot, $i, 1);
1050 for (my $i=$#holdslot; $i>=0; $i--) {
1051 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1052 push @freeslot, (splice @holdslot, $i, 1);
1056 # give up if no nodes are succeeding
1057 if ($working_slot_count < 1) {
1058 Log(undef, "Every node has failed -- giving up");
1065 push @freeslot, splice @holdslot;
1066 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1069 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1072 if ($main::please_continue) {
1073 $main::please_continue = 0;
1076 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1078 if (!reapchildren())
1080 check_refresh_wanted();
1082 update_progress_stats();
1083 select (undef, undef, undef, 0.1);
1084 killem (keys %proc) if $main::please_freeze;
1088 update_progress_stats();
1089 freeze_if_want_freeze();
1092 if (!defined $main::success)
1094 if (!@jobstep_todo) {
1096 } elsif ($working_slot_count < 1) {
1097 save_output_collection();
1099 exit(EX_RETRY_UNLOCKED);
1100 } elsif ($thisround_succeeded == 0 &&
1101 ($thisround_failed == 0 || $thisround_failed > 4)) {
1102 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1103 Log (undef, $message);
1108 goto ONELEVEL if !defined $main::success;
1111 release_allocation();
1113 my $collated_output = save_output_collection();
1114 Log (undef, "finish");
1119 if ($collated_output && $main::success) {
1120 $final_state = 'Complete';
1122 $final_state = 'Failed';
1124 $Job->update_attributes('state' => $final_state);
1126 exit (($final_state eq 'Complete') ? 0 : 1);
1130 sub update_progress_stats
1132 $progress_stats_updated = time;
1133 return if !$progress_is_dirty;
1134 my ($todo, $done, $running) = (scalar @jobstep_todo,
1135 scalar @jobstep_done,
1136 scalar keys(%proc));
1137 $Job->{'tasks_summary'} ||= {};
1138 $Job->{'tasks_summary'}->{'todo'} = $todo;
1139 $Job->{'tasks_summary'}->{'done'} = $done;
1140 $Job->{'tasks_summary'}->{'running'} = $running;
1141 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1142 Log (undef, "status: $done done, $running running, $todo todo");
1143 $progress_is_dirty = 0;
1150 my $children_reaped = 0;
1151 my @successful_task_uuids = ();
1153 while((my $pid = waitpid (-1, WNOHANG)) > 0)
1155 my $childstatus = $?;
1157 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1159 . $slot[$proc{$pid}->{slot}]->{cpu});
1160 my $jobstepidx = $proc{$pid}->{jobstepidx};
1162 if (!WIFEXITED($childstatus))
1164 # child did not exit (may be temporarily stopped)
1165 Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1170 my $elapsed = time - $proc{$pid}->{time};
1171 my $Jobstep = $jobstep[$jobstepidx];
1173 my $exitvalue = $childstatus >> 8;
1174 my $exitinfo = "exit ".exit_status_s($childstatus);
1175 $Jobstep->{'arvados_task'}->reload;
1176 my $task_success = $Jobstep->{'arvados_task'}->{success};
1178 Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1180 if (!defined $task_success) {
1181 # task did not indicate one way or the other --> fail
1182 Log($jobstepidx, sprintf(
1183 "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1184 exit_status_s($childstatus)));
1185 $Jobstep->{'arvados_task'}->{success} = 0;
1186 $Jobstep->{'arvados_task'}->save;
1193 $temporary_fail ||= $Jobstep->{tempfail};
1194 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1196 ++$thisround_failed;
1197 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1199 # Check for signs of a failed or misconfigured node
1200 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1201 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1202 # Don't count this against jobstep failure thresholds if this
1203 # node is already suspected faulty and srun exited quickly
1204 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1206 Log ($jobstepidx, "blaming failure on suspect node " .
1207 $slot[$proc{$pid}->{slot}]->{node}->{name});
1208 $temporary_fail ||= 1;
1210 ban_node_by_slot($proc{$pid}->{slot});
1213 Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1214 ++$Jobstep->{'failures'},
1215 $temporary_fail ? 'temporary' : 'permanent',
1218 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1219 # Give up on this task, and the whole job
1222 # Put this task back on the todo queue
1223 push @jobstep_todo, $jobstepidx;
1224 $Job->{'tasks_summary'}->{'failed'}++;
1228 push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1229 ++$thisround_succeeded;
1230 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1231 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1232 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1233 push @jobstep_done, $jobstepidx;
1234 Log ($jobstepidx, "success in $elapsed seconds");
1236 $Jobstep->{exitcode} = $childstatus;
1237 $Jobstep->{finishtime} = time;
1238 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1239 $Jobstep->{'arvados_task'}->save;
1240 process_stderr_final ($jobstepidx);
1241 Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1242 length($Jobstep->{'arvados_task'}->{output}),
1243 $Jobstep->{'arvados_task'}->{output}));
1245 close $reader{$jobstepidx};
1246 delete $reader{$jobstepidx};
1247 delete $slot[$proc{$pid}->{slot}]->{pid};
1248 push @freeslot, $proc{$pid}->{slot};
1251 $progress_is_dirty = 1;
1254 if (scalar(@successful_task_uuids) > 0)
1256 Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1258 my $newtask_list = [];
1259 my $newtask_results;
1261 $newtask_results = api_call(
1263 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1264 'order' => 'qsequence',
1265 'offset' => scalar(@$newtask_list),
1267 push(@$newtask_list, @{$newtask_results->{items}});
1268 } while (@{$newtask_results->{items}});
1269 Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1270 foreach my $arvados_task (@$newtask_list) {
1272 'level' => $arvados_task->{'sequence'},
1274 'arvados_task' => $arvados_task
1276 push @jobstep, $jobstep;
1277 push @jobstep_todo, $#jobstep;
1281 return $children_reaped;
1284 sub check_refresh_wanted
1286 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1288 $stat[9] > $latest_refresh &&
1289 # ...and we have actually locked the job record...
1290 $job_id eq $Job->{'uuid'}) {
1291 $latest_refresh = scalar time;
1292 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1293 for my $attr ('cancelled_at',
1294 'cancelled_by_user_uuid',
1295 'cancelled_by_client_uuid',
1297 $Job->{$attr} = $Job2->{$attr};
1299 if ($Job->{'state'} ne "Running") {
1300 if ($Job->{'state'} eq "Cancelled") {
1301 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1303 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1306 $main::please_freeze = 1;
1313 my $last_squeue_check = $squeue_checked;
1315 # Do not call `squeue` or check the kill list more than once every
1317 return if $last_squeue_check > time - 15;
1318 $squeue_checked = time;
1320 # Look for children from which we haven't received stderr data since
1321 # the last squeue check. If no such children exist, all procs are
1322 # alive and there's no need to even look at squeue.
1324 # As long as the crunchstat poll interval (10s) is shorter than the
1325 # squeue check interval (15s) this should make the squeue check an
1327 my $silent_procs = 0;
1328 for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1330 if (!exists($js->{stderr_at}))
1332 $js->{stderr_at} = 0;
1334 if ($js->{stderr_at} < $last_squeue_check)
1339 return if $silent_procs == 0;
1341 # use killem() on procs whose killtime is reached
1342 while (my ($pid, $procinfo) = each %proc)
1344 my $js = $jobstep[$procinfo->{jobstepidx}];
1345 if (exists $procinfo->{killtime}
1346 && $procinfo->{killtime} <= time
1347 && $js->{stderr_at} < $last_squeue_check)
1350 if ($js->{stderr_at}) {
1351 $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1353 Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1360 # here is an opportunity to check for mysterious problems with local procs
1364 # Get a list of steps still running. Note: squeue(1) says --steps
1365 # selects a format (which we override anyway) and allows us to
1366 # specify which steps we're interested in (which we don't).
1367 # Importantly, it also changes the meaning of %j from "job name" to
1368 # "step name" and (although this isn't mentioned explicitly in the
1369 # docs) switches from "one line per job" mode to "one line per step"
1370 # mode. Without it, we'd just get a list of one job, instead of a
1372 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1375 Log(undef, "warning: squeue exit status $? ($!)");
1380 # which of my jobsteps are running, according to squeue?
1382 for my $jobstepname (@squeue)
1384 $ok{$jobstepname} = 1;
1387 # Check for child procs >60s old and not mentioned by squeue.
1388 while (my ($pid, $procinfo) = each %proc)
1390 if ($procinfo->{time} < time - 60
1391 && $procinfo->{jobstepname}
1392 && !exists $ok{$procinfo->{jobstepname}}
1393 && !exists $procinfo->{killtime})
1395 # According to slurm, this task has ended (successfully or not)
1396 # -- but our srun child hasn't exited. First we must wait (30
1397 # seconds) in case this is just a race between communication
1398 # channels. Then, if our srun child process still hasn't
1399 # terminated, we'll conclude some slurm communication
1400 # error/delay has caused the task to die without notifying srun,
1401 # and we'll kill srun ourselves.
1402 $procinfo->{killtime} = time + 30;
1403 Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1409 sub release_allocation
1413 Log (undef, "release job allocation");
1414 system "scancel $ENV{SLURM_JOB_ID}";
1423 my $sel = IO::Select->new();
1424 foreach my $jobstepidx (keys %reader)
1426 my $fd = $reader{$jobstepidx};
1428 $fd_job{$fd} = $jobstepidx;
1430 if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1431 $sel->add($stdout_fd);
1432 $fd_job{$stdout_fd} = $jobstepidx;
1435 # select on all reader fds with 0.1s timeout
1436 my @ready_fds = $sel->can_read(0.1);
1437 foreach my $fd (@ready_fds)
1440 if (0 < sysread ($fd, $buf, 65536))
1443 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1445 my $jobstepidx = $fd_job{$fd};
1446 if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1447 $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1451 $jobstep[$jobstepidx]->{stderr_at} = time;
1452 $jobstep[$jobstepidx]->{stderr} .= $buf;
1454 # Consume everything up to the last \n
1455 preprocess_stderr ($jobstepidx);
1457 if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1459 # If we get a lot of stderr without a newline, chop off the
1460 # front to avoid letting our buffer grow indefinitely.
1461 substr ($jobstep[$jobstepidx]->{stderr},
1462 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1470 # Consume all full lines of stderr for a jobstep. Everything after the
1471 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1473 sub preprocess_stderr
1475 my $jobstepidx = shift;
1477 while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1479 substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1480 Log ($jobstepidx, "stderr $line");
1481 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1483 $main::please_freeze = 1;
1485 elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1486 # Skip the following tempfail checks if this srun proc isn't
1487 # attached to a particular worker slot.
1489 elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1490 my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1491 $slot[$job_slot_index]->{node}->{fail_count}++;
1492 $jobstep[$jobstepidx]->{tempfail} = 1;
1493 ban_node_by_slot($job_slot_index);
1495 elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1496 $jobstep[$jobstepidx]->{tempfail} = 1;
1497 ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1499 elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1500 $jobstep[$jobstepidx]->{tempfail} = 1;
1506 sub process_stderr_final
1508 my $jobstepidx = shift;
1509 preprocess_stderr ($jobstepidx);
1512 Log ($jobstepidx, "stderr $_");
1513 } split ("\n", $jobstep[$jobstepidx]->{stderr});
1514 $jobstep[$jobstepidx]->{stderr} = '';
1521 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1522 Log(undef, "fetch_block run error from arv-get $hash: $!");
1525 my $output_block = "";
1528 my $bytes = sysread($keep, $buf, 1024 * 1024);
1529 if (!defined $bytes) {
1530 Log(undef, "fetch_block read error from arv-get: $!");
1531 $output_block = undef;
1533 } elsif ($bytes == 0) {
1534 # sysread returns 0 at the end of the pipe.
1537 # some bytes were read into buf.
1538 $output_block .= $buf;
1543 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1544 $output_block = undef;
1546 return $output_block;
1549 # Create a collection by concatenating the output of all tasks (each
1550 # task's output is either a manifest fragment, a locator for a
1551 # manifest fragment stored in Keep, or nothing at all). Return the
1552 # portable_data_hash of the new collection.
1553 sub create_output_collection
1555 Log (undef, "collate");
1557 my ($child_out, $child_in);
1558 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1561 print (arvados.api("v1").collections().
1562 create(body={"manifest_text": sys.stdin.read()}).
1563 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1567 my $manifest_size = 0;
1571 my $output = $_->{'arvados_task'}->{output};
1572 next if (!defined($output));
1574 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1575 $next_write = fetch_block($output);
1577 $next_write = $output;
1579 if (defined($next_write)) {
1580 if (!defined(syswrite($child_in, $next_write))) {
1581 # There's been an error writing. Stop the loop.
1582 # We'll log details about the exit code later.
1585 $manifest_size += length($next_write);
1588 my $uuid = $_->{'arvados_task'}->{'uuid'};
1589 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1594 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1597 my $s = IO::Select->new($child_out);
1598 if ($s->can_read(120)) {
1599 sysread($child_out, $joboutput, 1024 * 1024);
1602 Log(undef, "output collection creation exited " . exit_status_s($?));
1608 Log (undef, "timed out while creating output collection");
1609 foreach my $signal (2, 2, 2, 15, 15, 9) {
1610 kill($signal, $pid);
1611 last if waitpid($pid, WNOHANG) == -1;
1620 # Calls create_output_collection, logs the result, and returns it.
1621 # If that was successful, save that as the output in the job record.
1622 sub save_output_collection {
1623 my $collated_output = create_output_collection();
1625 if (!$collated_output) {
1626 Log(undef, "Failed to write output collection");
1629 Log(undef, "job output $collated_output");
1630 $Job->update_attributes('output' => $collated_output);
1632 return $collated_output;
1639 my $sig = 2; # SIGINT first
1640 if (exists $proc{$_}->{"sent_$sig"} &&
1641 time - $proc{$_}->{"sent_$sig"} > 4)
1643 $sig = 15; # SIGTERM if SIGINT doesn't work
1645 if (exists $proc{$_}->{"sent_$sig"} &&
1646 time - $proc{$_}->{"sent_$sig"} > 4)
1648 $sig = 9; # SIGKILL if SIGTERM doesn't work
1650 if (!exists $proc{$_}->{"sent_$sig"})
1652 Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1654 select (undef, undef, undef, 0.1);
1657 kill $sig, $_; # srun wants two SIGINT to really interrupt
1659 $proc{$_}->{"sent_$sig"} = time;
1660 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1670 vec($bits,fileno($_),1) = 1;
1676 # Send log output to Keep via arv-put.
1678 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1679 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1680 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1681 # $log_pipe_pid is the pid of the arv-put subprocess.
1683 # The only functions that should access these variables directly are:
1685 # log_writer_start($logfilename)
1686 # Starts an arv-put pipe, reading data on stdin and writing it to
1687 # a $logfilename file in an output collection.
1689 # log_writer_read_output([$timeout])
1690 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1691 # Passes $timeout to the select() call, with a default of 0.01.
1692 # Returns the result of the last read() call on $log_pipe_out, or
1693 # -1 if read() wasn't called because select() timed out.
1694 # Only other log_writer_* functions should need to call this.
1696 # log_writer_send($txt)
1697 # Writes $txt to the output log collection.
1699 # log_writer_finish()
1700 # Closes the arv-put pipe and returns the output that it produces.
1702 # log_writer_is_active()
1703 # Returns a true value if there is currently a live arv-put
1704 # process, false otherwise.
1706 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1709 sub log_writer_start($)
1711 my $logfilename = shift;
1712 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1716 '--filename', $logfilename,
1718 $log_pipe_out_buf = "";
1719 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1722 sub log_writer_read_output {
1723 my $timeout = shift || 0.01;
1725 while ($read && $log_pipe_out_select->can_read($timeout)) {
1726 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1727 length($log_pipe_out_buf));
1729 if (!defined($read)) {
1730 Log(undef, "error reading log manifest from arv-put: $!");
1735 sub log_writer_send($)
1738 print $log_pipe_in $txt;
1739 log_writer_read_output();
1742 sub log_writer_finish()
1744 return unless $log_pipe_pid;
1746 close($log_pipe_in);
1748 my $logger_failed = 0;
1749 my $read_result = log_writer_read_output(120);
1750 if ($read_result == -1) {
1751 $logger_failed = -1;
1752 Log (undef, "timed out reading from 'arv-put'");
1753 } elsif ($read_result != 0) {
1754 $logger_failed = -2;
1755 Log(undef, "failed to read arv-put log manifest to EOF");
1758 waitpid($log_pipe_pid, 0);
1760 $logger_failed ||= $?;
1761 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1764 close($log_pipe_out);
1765 my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1766 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1767 $log_pipe_out_select = undef;
1769 return $arv_put_output;
1772 sub log_writer_is_active() {
1773 return $log_pipe_pid;
1776 sub Log # ($jobstepidx, $logmessage)
1778 my ($jobstepidx, $logmessage) = @_;
1779 if ($logmessage =~ /\n/) {
1780 for my $line (split (/\n/, $_[1])) {
1781 Log ($jobstepidx, $line);
1785 my $fh = select STDERR; $|=1; select $fh;
1787 if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1788 $task_qseq = $jobstepidx;
1790 my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1791 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1794 if (log_writer_is_active() || -t STDERR) {
1795 my @gmtime = gmtime;
1796 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1797 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1799 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1801 if (log_writer_is_active()) {
1802 log_writer_send($datetime . " " . $message);
1809 my ($package, $file, $line) = caller;
1810 my $message = "@_ at $file line $line\n";
1811 Log (undef, $message);
1812 freeze() if @jobstep_todo;
1813 create_output_collection() if @jobstep_todo;
1823 if ($Job->{'state'} eq 'Cancelled') {
1824 $Job->update_attributes('finished_at' => scalar gmtime);
1826 $Job->update_attributes('state' => 'Failed');
1833 my $justcheckpoint = shift; # false if this will be the last meta saved
1834 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1835 return unless log_writer_is_active();
1836 my $log_manifest = log_writer_finish();
1837 return unless defined($log_manifest);
1840 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1841 $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1844 my $log_coll = api_call(
1845 "collections/create", ensure_unique_name => 1, collection => {
1846 manifest_text => $log_manifest,
1847 owner_uuid => $Job->{owner_uuid},
1848 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1850 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1851 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1855 sub freeze_if_want_freeze
1857 if ($main::please_freeze)
1859 release_allocation();
1862 # kill some srun procs before freeze+stop
1863 map { $proc{$_} = {} } @_;
1866 killem (keys %proc);
1867 select (undef, undef, undef, 0.1);
1869 while (($died = waitpid (-1, WNOHANG)) > 0)
1871 delete $proc{$died};
1876 create_output_collection();
1886 Log (undef, "Freeze not implemented");
1893 croak ("Thaw not implemented");
1909 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1916 my $srunargs = shift;
1917 my $execargs = shift;
1918 my $opts = shift || {};
1921 my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1922 Log (undef, "$label: start");
1924 my ($stderr_r, $stderr_w);
1925 pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1927 my ($stdout_r, $stdout_w);
1928 pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1930 my $srunpid = fork();
1935 fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1936 fcntl($stdout_w, F_SETFL, 0) or croak($!);
1937 open(STDERR, ">&", $stderr_w);
1938 open(STDOUT, ">&", $stdout_w);
1939 srun ($srunargs, $execargs, $opts, $stdin);
1945 set_nonblocking($stderr_r);
1946 set_nonblocking($stdout_r);
1948 # Add entries to @jobstep and %proc so check_squeue() and
1949 # freeze_if_want_freeze() can treat it like a job task process.
1953 stderr_captured => '',
1954 stdout_r => $stdout_r,
1955 stdout_captured => '',
1957 my $jobstepidx = $#jobstep;
1959 jobstepidx => $jobstepidx,
1961 $reader{$jobstepidx} = $stderr_r;
1963 while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1964 my $busy = readfrompipes();
1965 if (!$busy || ($latest_refresh + 2 < scalar time)) {
1966 check_refresh_wanted();
1970 select(undef, undef, undef, 0.1);
1972 killem(keys %proc) if $main::please_freeze;
1976 1 while readfrompipes();
1977 process_stderr_final ($jobstepidx);
1979 Log (undef, "$label: exit ".exit_status_s($exited));
1983 delete $proc{$srunpid};
1984 delete $reader{$jobstepidx};
1986 my $j = pop @jobstep;
1987 return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1993 my $srunargs = shift;
1994 my $execargs = shift;
1995 my $opts = shift || {};
1997 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1999 $Data::Dumper::Terse = 1;
2000 $Data::Dumper::Indent = 0;
2001 my $show_cmd = Dumper($args);
2002 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2003 $show_cmd =~ s/\n/ /g;
2004 if ($opts->{fork}) {
2005 Log(undef, "starting: $show_cmd");
2007 # This is a child process: parent is in charge of reading our
2008 # stderr and copying it to Log() if needed.
2009 warn "starting: $show_cmd\n";
2012 if (defined $stdin) {
2013 my $child = open STDIN, "-|";
2014 defined $child or die "no fork: $!";
2016 print $stdin or die $!;
2017 close STDOUT or die $!;
2022 return system (@$args) if $opts->{fork};
2025 warn "ENV size is ".length(join(" ",%ENV));
2026 die "exec failed: $!: @$args";
2030 sub ban_node_by_slot {
2031 # Don't start any new jobsteps on this node for 60 seconds
2033 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2034 $slot[$slotid]->{node}->{hold_count}++;
2035 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2040 my ($lockfile, $error_message) = @_;
2041 open L, ">", $lockfile or croak("$lockfile: $!");
2042 if (!flock L, LOCK_EX|LOCK_NB) {
2043 croak("Can't lock $lockfile: $error_message\n");
2047 sub find_docker_image {
2048 # Given a Keep locator, check to see if it contains a Docker image.
2049 # If so, return its stream name and Docker hash.
2050 # If not, return undef for both values.
2051 my $locator = shift;
2052 my ($streamname, $filename);
2053 my $image = api_call("collections/get", uuid => $locator);
2055 foreach my $line (split(/\n/, $image->{manifest_text})) {
2056 my @tokens = split(/\s+/, $line);
2058 $streamname = shift(@tokens);
2059 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2060 if (defined($filename)) {
2061 return (undef, undef); # More than one file in the Collection.
2063 $filename = (split(/:/, $filedata, 3))[2];
2068 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2069 return ($streamname, $1);
2071 return (undef, undef);
2076 # Calculate the number of times an operation should be retried,
2077 # assuming exponential backoff, and that we're willing to retry as
2078 # long as tasks have been running. Enforce a minimum of 3 retries.
2079 my ($starttime, $endtime, $timediff, $retries);
2081 $starttime = $jobstep[0]->{starttime};
2082 $endtime = $jobstep[-1]->{finishtime};
2084 if (!defined($starttime)) {
2086 } elsif (!defined($endtime)) {
2087 $timediff = time - $starttime;
2089 $timediff = ($endtime - $starttime) - (time - $endtime);
2091 if ($timediff > 0) {
2092 $retries = int(log($timediff) / log(2));
2094 $retries = 1; # Use the minimum.
2096 return ($retries > 3) ? $retries : 3;
2100 # Pass in two function references.
2101 # This method will be called with the remaining arguments.
2102 # If it dies, retry it with exponential backoff until it succeeds,
2103 # or until the current retry_count is exhausted. After each failure
2104 # that can be retried, the second function will be called with
2105 # the current try count (0-based), next try time, and error message.
2106 my $operation = shift;
2107 my $retry_callback = shift;
2108 my $retries = retry_count();
2109 foreach my $try_count (0..$retries) {
2110 my $next_try = time + (2 ** $try_count);
2111 my $result = eval { $operation->(@_); };
2114 } elsif ($try_count < $retries) {
2115 $retry_callback->($try_count, $next_try, $@);
2116 my $sleep_time = $next_try - time;
2117 sleep($sleep_time) if ($sleep_time > 0);
2120 # Ensure the error message ends in a newline, so Perl doesn't add
2121 # retry_op's line number to it.
2127 # Pass in a /-separated API method name, and arguments for it.
2128 # This function will call that method, retrying as needed until
2129 # the current retry_count is exhausted, with a log on the first failure.
2130 my $method_name = shift;
2131 my $log_api_retry = sub {
2132 my ($try_count, $next_try_at, $errmsg) = @_;
2133 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2134 $errmsg =~ s/\s/ /g;
2135 $errmsg =~ s/\s+$//;
2137 if ($next_try_at < time) {
2138 $retry_msg = "Retrying.";
2140 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2141 $retry_msg = "Retrying at $next_try_fmt.";
2143 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2146 foreach my $key (split(/\//, $method_name)) {
2147 $method = $method->{$key};
2149 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2153 # Given a $?, return a human-readable exit code string like "0" or
2154 # "1" or "0 with signal 1" or "1 with signal 11".
2155 my $exitcode = shift;
2156 my $s = $exitcode >> 8;
2157 if ($exitcode & 0x7f) {
2158 $s .= " with signal " . ($exitcode & 0x7f);
2160 if ($exitcode & 0x80) {
2161 $s .= " with core dump";
2166 sub handle_readall {
2167 # Pass in a glob reference to a file handle.
2168 # Read all its contents and return them as a string.
2169 my $fh_glob_ref = shift;
2171 return <$fh_glob_ref>;
2174 sub tar_filename_n {
2176 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2179 sub add_git_archive {
2180 # Pass in a git archive command as a string or list, a la system().
2181 # This method will save its output to be included in the archive sent to the
2185 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2186 croak("Failed to save git archive: $!");
2188 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2190 waitpid($git_pid, 0);
2193 croak("Failed to save git archive: git exited " . exit_status_s($?));
2197 sub combined_git_archive {
2198 # Combine all saved tar archives into a single archive, then return its
2199 # contents in a string. Return undef if no archives have been saved.
2200 if ($git_tar_count < 1) {
2203 my $base_tar_name = tar_filename_n(1);
2204 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2205 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2206 if ($tar_exit != 0) {
2207 croak("Error preparing build archive: tar -A exited " .
2208 exit_status_s($tar_exit));
2211 if (!open(GIT_TAR, "<", $base_tar_name)) {
2212 croak("Could not open build archive: $!");
2214 my $tar_contents = handle_readall(\*GIT_TAR);
2216 return $tar_contents;
2219 sub set_nonblocking {
2221 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2222 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2228 # This is crunch-job's internal dispatch script. crunch-job running on the API
2229 # server invokes this script on individual compute nodes, or localhost if we're
2230 # running a job locally. It gets called in two modes:
2232 # * No arguments: Installation mode. Read a tar archive from the DATA
2233 # file handle; it includes the Crunch script's source code, and
2234 # maybe SDKs as well. Those should be installed in the proper
2235 # locations. This runs outside of any Docker container, so don't try to
2236 # introspect Crunch's runtime environment.
2238 # * With arguments: Crunch script run mode. This script should set up the
2239 # environment, then run the command specified in the arguments. This runs
2240 # inside any Docker container.
2243 use File::Path qw( make_path remove_tree );
2244 use POSIX qw(getcwd);
2246 use constant TASK_TEMPFAIL => 111;
2248 # Map SDK subdirectories to the path environments they belong to.
2249 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2251 my $destdir = $ENV{"CRUNCH_SRC"};
2252 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2253 my $repo = $ENV{"CRUNCH_SRC_URL"};
2254 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2255 my $job_work = $ENV{"JOB_WORK"};
2256 my $task_work = $ENV{"TASK_WORK"};
2258 open(STDOUT_ORIG, ">&", STDOUT);
2259 open(STDERR_ORIG, ">&", STDERR);
2261 for my $dir ($destdir, $job_work, $task_work) {
2264 -e $dir or die "Failed to create temporary directory ($dir): $!";
2269 remove_tree($task_work, {keep_root => 1});
2272 ### Crunch script run mode
2274 # We want to do routine logging during task 0 only. This gives the user
2275 # the information they need, but avoids repeating the information for every
2278 if ($ENV{TASK_SEQUENCE} eq "0") {
2281 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2287 my $python_src = "$install_dir/python";
2288 my $venv_dir = "$job_work/.arvados.venv";
2289 my $venv_built = -e "$venv_dir/bin/activate";
2290 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2291 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2292 "--python=python2.7", $venv_dir);
2293 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2295 $Log->("Built Python SDK virtualenv");
2298 my @pysdk_version_cmd = ("python", "-c",
2299 "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2301 $Log->("Running in Python SDK virtualenv");
2302 @pysdk_version_cmd = ();
2303 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2304 @ARGV = ("/bin/sh", "-ec",
2305 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2306 } elsif (-d $python_src) {
2307 $Log->("Warning: virtualenv not found inside Docker container default " .
2308 "\$PATH. Can't install Python SDK.");
2311 if (@pysdk_version_cmd) {
2312 open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2313 my $pysdk_version = <$pysdk_version_pipe>;
2314 close($pysdk_version_pipe);
2316 chomp($pysdk_version);
2317 $Log->("Using Arvados SDK version $pysdk_version");
2319 # A lot could've gone wrong here, but pretty much all of it means that
2320 # Python won't be able to load the Arvados SDK.
2321 $Log->("Warning: Arvados SDK not found");
2325 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2326 my $sdk_path = "$install_dir/$sdk_dir";
2328 if ($ENV{$sdk_envkey}) {
2329 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2331 $ENV{$sdk_envkey} = $sdk_path;
2333 $Log->("Arvados SDK added to %s", $sdk_envkey);
2338 die "Cannot exec `@ARGV`: $!";
2341 ### Installation mode
2342 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2344 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2345 # This exact git archive (source + arvados sdk) is already installed
2346 # here, so there's no need to reinstall it.
2348 # We must consume our DATA section, though: otherwise the process
2349 # feeding it to us will get SIGPIPE.
2351 while (read(DATA, $buf, 65536)) { }
2356 unlink "$destdir.archive_hash";
2360 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2361 local $SIG{PIPE} = "IGNORE";
2362 warn "Extracting archive: $archive_hash\n";
2363 # --ignore-zeros is necessary sometimes: depending on how much NUL
2364 # padding tar -A put on our combined archive (which in turn depends
2365 # on the length of the component archives) tar without
2366 # --ignore-zeros will exit before consuming stdin and cause close()
2367 # to fail on the resulting SIGPIPE.
2368 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2369 die "Error launching 'tar -xC $destdir': $!";
2371 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2372 # get SIGPIPE. We must feed it data incrementally.
2374 while (read(DATA, $tar_input, 65536)) {
2375 print TARX $tar_input;
2378 die "'tar -xC $destdir' exited $?: $!";
2384 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2386 foreach my $sdk_lang (("python",
2387 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2388 if (-d "$sdk_root/$sdk_lang") {
2389 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2390 die "Failed to install $sdk_lang SDK: $!";
2396 my $python_dir = "$install_dir/python";
2397 if ((-d $python_dir) and can_run("python2.7")) {
2398 open(my $egg_info_pipe, "-|",
2399 "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2400 my @egg_info_errors = <$egg_info_pipe>;
2401 close($egg_info_pipe);
2404 if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2405 # egg_info apparently failed because it couldn't ask git for a build tag.
2406 # Specify no build tag.
2407 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2408 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2411 my $egg_info_exit = $? >> 8;
2412 foreach my $errline (@egg_info_errors) {
2415 warn "python setup.py egg_info failed: exit $egg_info_exit";
2416 exit ($egg_info_exit || 1);
2421 # Hide messages from the install script (unless it fails: shell_or_die
2422 # will show $destdir.log in that case).
2423 open(STDOUT, ">>", "$destdir.log");
2424 open(STDERR, ">&", STDOUT);
2426 if (-e "$destdir/crunch_scripts/install") {
2427 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2428 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2430 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2431 } elsif (-e "./install.sh") {
2432 shell_or_die (undef, "./install.sh", $install_dir);
2435 if ($archive_hash) {
2436 unlink "$destdir.archive_hash.new";
2437 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2438 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2444 my $command_name = shift;
2445 open(my $which, "-|", "which", $command_name);
2446 while (<$which>) { }
2453 my $exitcode = shift;
2455 if ($ENV{"DEBUG"}) {
2456 print STDERR "@_\n";
2458 if (system (@_) != 0) {
2461 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2462 open STDERR, ">&STDERR_ORIG";
2463 system ("cat $destdir.log >&2");
2464 warn "@_ failed ($err): $exitstatus";
2465 if (defined($exitcode)) {
2469 exit (($code >> 8) || 1);