2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106 if ($ENV{"USER"} ne "crunch" && $< != 0) {
107 # use a tmp dir unique for my uid
108 $ENV{"CRUNCH_TMP"} .= "-$<";
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132 'git-dir=s' => \$git_dir,
133 'job=s' => \$jobspec,
134 'job-api-token=s' => \$job_api_token,
135 'no-clear-tmp' => \$no_clear_tmp,
136 'resume-stash=s' => \$resume_stash,
137 'docker-bin=s' => \$docker_bin,
138 'docker-run-args=s' => \$docker_run_args,
141 if (defined $job_api_token) {
142 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
150 $main::ENV{CRUNCH_DEBUG} = 1;
154 $main::ENV{CRUNCH_DEBUG} = 0;
157 my $arv = Arvados->new('apiVersion' => 'v1');
166 if ($jobspec =~ /^[-a-z\d]+$/)
168 # $jobspec is an Arvados UUID, not a JSON job specification
169 $Job = api_call("jobs/get", uuid => $jobspec);
174 $local_job = JSON::decode_json($jobspec);
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
182 if (($Job || $local_job)->{docker_image_locator}) {
183 $cmd = [$docker_bin, 'ps', '-q'];
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
190 Log(undef, "Sanity check failed: ".exit_status_s($?));
193 Log(undef, "Sanity check OK");
196 my $User = api_call("users/current");
199 if (!$force_unlock) {
200 # Claim this job, and make sure nobody else does
201 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
203 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
212 map { croak ("No $_ specified") unless $local_job->{$_} }
213 qw(script script_version script_parameters);
216 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217 $local_job->{'started_at'} = gmtime;
218 $local_job->{'state'} = 'Running';
220 $Job = api_call("jobs/create", job => $local_job);
222 $job_id = $Job->{'uuid'};
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
233 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234 chomp($gem_versions);
235 chop($gem_versions); # Closing parentheses
240 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
242 Log (undef, "check slurm allocation");
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
249 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250 push @sinfo, "$localcpus localhost";
252 if (exists $ENV{SLURM_NODELIST})
254 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
258 my ($ncpus, $slurm_nodelist) = split;
259 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
262 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
265 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
268 foreach (split (",", $ranges))
281 push @nodelist, map {
283 $n =~ s/\[[-,\d]+\]/$_/;
290 push @nodelist, $nodelist;
293 foreach my $nodename (@nodelist)
295 Log (undef, "node $nodename - $ncpus slots");
296 my $node = { name => $nodename,
298 # The number of consecutive times a task has been dispatched
299 # to this node and failed.
301 # The number of consecutive times that SLURM has reported
302 # a node failure since the last successful task.
304 # Don't dispatch work to this node until this time
305 # (in seconds since the epoch) has passed.
307 foreach my $cpu (1..$ncpus)
309 push @slot, { node => $node,
313 push @node, @nodelist;
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
324 $Job->update_attributes(
325 'tasks_summary' => { 'failed' => 0,
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
359 if (defined $Job->{thawedfromkey})
361 thaw ($Job->{thawedfromkey});
365 my $first_task = api_call("job_tasks/create", job_task => {
366 'job_uuid' => $Job->{'uuid'},
371 push @jobstep, { 'level' => 0,
373 'arvados_task' => $first_task,
375 push @jobstep_todo, 0;
381 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
388 if (!defined $no_clear_tmp) {
389 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390 Log (undef, "Clean work dirs");
392 my $cleanpid = fork();
395 # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396 # Then clean up work directories.
397 # TODO: When #5036 is done and widely deployed, we can limit mount's
398 # -t option to simply fuse.keep.
399 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400 ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
405 last if $cleanpid == waitpid (-1, WNOHANG);
406 freeze_if_want_freeze ($cleanpid);
407 select (undef, undef, undef, 0.1);
410 Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411 exit(EX_RETRY_UNLOCKED);
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418 Log (undef, "Install docker image $docker_locator");
419 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
422 croak("No Docker image hash found from locator $docker_locator");
424 Log (undef, "docker image hash is $docker_hash");
425 $docker_stream =~ s/^\.//;
426 my $docker_install_script = qq{
427 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
428 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
431 my $docker_pid = fork();
432 if ($docker_pid == 0)
434 srun (["srun", "--nodelist=" . join(',', @node)],
435 ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
440 last if $docker_pid == waitpid (-1, WNOHANG);
441 freeze_if_want_freeze ($docker_pid);
442 select (undef, undef, undef, 0.1);
446 Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
447 exit(EX_RETRY_UNLOCKED);
450 # Determine whether this version of Docker supports memory+swap limits.
451 srun(["srun", "--nodelist=" . $node[0]],
452 ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
454 $docker_limitmem = ($? == 0);
456 # Find a non-root Docker user to use.
457 # Tries the default user for the container, then 'crunch', then 'nobody',
458 # testing for whether the actual user id is non-zero. This defends against
459 # mistakes but not malice, but we intend to harden the security in the future
460 # so we don't want anyone getting used to their jobs running as root in their
462 my @tryusers = ("", "crunch", "nobody");
463 foreach my $try_user (@tryusers) {
465 if ($try_user eq "") {
466 Log(undef, "Checking if container default user is not UID 0");
469 Log(undef, "Checking if user '$try_user' is not UID 0");
470 $try_user_arg = "--user=$try_user";
472 srun(["srun", "--nodelist=" . $node[0]],
474 "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
478 $dockeruserarg = $try_user_arg;
479 if ($try_user eq "") {
480 Log(undef, "Container will run with default user");
482 Log(undef, "Container will run with $dockeruserarg");
488 if (!defined $dockeruserarg) {
489 croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
492 if ($Job->{arvados_sdk_version}) {
493 # The job also specifies an Arvados SDK version. Add the SDKs to the
494 # tar file for the build script to install.
495 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
496 $Job->{arvados_sdk_version}));
497 add_git_archive("git", "--git-dir=$git_dir", "archive",
498 "--prefix=.arvados.sdk/",
499 $Job->{arvados_sdk_version}, "sdk");
503 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
504 # If script_version looks like an absolute path, *and* the --git-dir
505 # argument was not given -- which implies we were not invoked by
506 # crunch-dispatch -- we will use the given path as a working
507 # directory instead of resolving script_version to a git commit (or
508 # doing anything else with git).
509 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
510 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
513 # Resolve the given script_version to a git commit sha1. Also, if
514 # the repository is remote, clone it into our local filesystem: this
515 # ensures "git archive" will work, and is necessary to reliably
516 # resolve a symbolic script_version like "master^".
517 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
519 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
521 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
523 # If we're running under crunch-dispatch, it will have already
524 # pulled the appropriate source tree into its own repository, and
525 # given us that repo's path as $git_dir.
527 # If we're running a "local" job, we might have to fetch content
528 # from a remote repository.
530 # (Currently crunch-dispatch gives a local path with --git-dir, but
531 # we might as well accept URLs there too in case it changes its
533 my $repo = $git_dir || $Job->{'repository'};
535 # Repository can be remote or local. If remote, we'll need to fetch it
536 # to a local dir before doing `git log` et al.
539 if ($repo =~ m{://|^[^/]*:}) {
540 # $repo is a git url we can clone, like git:// or https:// or
541 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
542 # not recognized here because distinguishing that from a local
543 # path is too fragile. If you really need something strange here,
544 # use the ssh:// form.
545 $repo_location = 'remote';
546 } elsif ($repo =~ m{^\.*/}) {
547 # $repo is a local path to a git index. We'll also resolve ../foo
548 # to ../foo/.git if the latter is a directory. To help
549 # disambiguate local paths from named hosted repositories, this
550 # form must be given as ./ or ../ if it's a relative path.
551 if (-d "$repo/.git") {
552 $repo = "$repo/.git";
554 $repo_location = 'local';
556 # $repo is none of the above. It must be the name of a hosted
558 my $arv_repo_list = api_call("repositories/list",
559 'filters' => [['name','=',$repo]]);
560 my @repos_found = @{$arv_repo_list->{'items'}};
561 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
563 Log(undef, "Repository '$repo' -> "
564 . join(", ", map { $_->{'uuid'} } @repos_found));
567 croak("Error: Found $n_found repositories with name '$repo'.");
569 $repo = $repos_found[0]->{'fetch_url'};
570 $repo_location = 'remote';
572 Log(undef, "Using $repo_location repository '$repo'");
573 $ENV{"CRUNCH_SRC_URL"} = $repo;
575 # Resolve given script_version (we'll call that $treeish here) to a
576 # commit sha1 ($commit).
577 my $treeish = $Job->{'script_version'};
579 if ($repo_location eq 'remote') {
580 # We minimize excess object-fetching by re-using the same bare
581 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
582 # just keep adding remotes to it as needed.
583 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
584 my $gitcmd = "git --git-dir=\Q$local_repo\E";
586 # Set up our local repo for caching remote objects, making
588 if (!-d $local_repo) {
589 make_path($local_repo) or croak("Error: could not create $local_repo");
591 # This works (exits 0 and doesn't delete fetched objects) even
592 # if $local_repo is already initialized:
593 `$gitcmd init --bare`;
595 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
598 # If $treeish looks like a hash (or abbrev hash) we look it up in
599 # our local cache first, since that's cheaper. (We don't want to
600 # do that with tags/branches though -- those change over time, so
601 # they should always be resolved by the remote repo.)
602 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
603 # Hide stderr because it's normal for this to fail:
604 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
606 # Careful not to resolve a branch named abcdeff to commit 1234567:
607 $sha1 =~ /^$treeish/ &&
608 $sha1 =~ /^([0-9a-f]{40})$/s) {
610 Log(undef, "Commit $commit already present in $local_repo");
614 if (!defined $commit) {
615 # If $treeish isn't just a hash or abbrev hash, or isn't here
616 # yet, we need to fetch the remote to resolve it correctly.
618 # First, remove all local heads. This prevents a name that does
619 # not exist on the remote from resolving to (or colliding with)
620 # a previously fetched branch or tag (possibly from a different
622 remove_tree("$local_repo/refs/heads", {keep_root => 1});
624 Log(undef, "Fetching objects from $repo to $local_repo");
625 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
627 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
631 # Now that the data is all here, we will use our local repo for
632 # the rest of our git activities.
636 my $gitcmd = "git --git-dir=\Q$repo\E";
637 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
638 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
639 croak("`$gitcmd rev-list` exited "
641 .", '$treeish' not found, giving up");
644 Log(undef, "Version $treeish is commit $commit");
646 if ($commit ne $Job->{'script_version'}) {
647 # Record the real commit id in the database, frozentokey, logs,
648 # etc. -- instead of an abbreviation or a branch name which can
649 # become ambiguous or point to a different commit in the future.
650 if (!$Job->update_attributes('script_version' => $commit)) {
651 croak("Error: failed to update job's script_version attribute");
655 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
656 add_git_archive("$gitcmd archive ''\Q$commit\E");
659 my $git_archive = combined_git_archive();
660 if (!defined $git_archive) {
661 Log(undef, "Skip install phase (no git archive)");
663 Log(undef, "Warning: This probably means workers have no source tree!");
668 my $install_script_tries_left = 3;
669 for (my $attempts = 0; $attempts < 3; $attempts++) {
670 Log(undef, "Run install script on all workers");
672 my @srunargs = ("srun",
673 "--nodelist=$nodelist",
674 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
675 my @execargs = ("sh", "-c",
676 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
678 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
679 my ($install_stderr_r, $install_stderr_w);
680 pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
681 set_nonblocking($install_stderr_r);
682 my $installpid = fork();
683 if ($installpid == 0)
685 close($install_stderr_r);
686 fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
687 open(STDOUT, ">&", $install_stderr_w);
688 open(STDERR, ">&", $install_stderr_w);
689 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
692 close($install_stderr_w);
693 # Tell freeze_if_want_freeze how to kill the child, otherwise the
694 # "waitpid(installpid)" loop won't get interrupted by a freeze:
695 $proc{$installpid} = {};
697 # Track whether anything appears on stderr other than slurm errors
698 # ("srun: ...") and the "starting: ..." message printed by the
699 # srun subroutine itself:
700 my $stderr_anything_from_script = 0;
701 my $match_our_own_errors = '^(srun: error: |starting: \[)';
702 while ($installpid != waitpid(-1, WNOHANG)) {
703 freeze_if_want_freeze ($installpid);
704 # Wait up to 0.1 seconds for something to appear on stderr, then
705 # do a non-blocking read.
706 my $bits = fhbits($install_stderr_r);
707 select ($bits, undef, $bits, 0.1);
708 if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
710 while ($stderr_buf =~ /^(.*?)\n/) {
712 substr $stderr_buf, 0, 1+length($line), "";
713 Log(undef, "stderr $line");
714 if ($line !~ /$match_our_own_errors/) {
715 $stderr_anything_from_script = 1;
720 delete $proc{$installpid};
721 $install_exited = $?;
722 close($install_stderr_r);
723 if (length($stderr_buf) > 0) {
724 if ($stderr_buf !~ /$match_our_own_errors/) {
725 $stderr_anything_from_script = 1;
727 Log(undef, "stderr $stderr_buf")
730 Log (undef, "Install script exited ".exit_status_s($install_exited));
731 last if $install_exited == 0 || $main::please_freeze;
732 # If the install script fails but doesn't print an error message,
733 # the next thing anyone is likely to do is just run it again in
734 # case it was a transient problem like "slurm communication fails
735 # because the network isn't reliable enough". So we'll just do
736 # that ourselves (up to 3 attempts in total). OTOH, if there is an
737 # error message, the problem is more likely to have a real fix and
738 # we should fail the job so the fixing process can start, instead
739 # of doing 2 more attempts.
740 last if $stderr_anything_from_script;
743 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
744 unlink($tar_filename);
747 if ($install_exited != 0) {
752 foreach (qw (script script_version script_parameters runtime_constraints))
756 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
758 foreach (split (/\n/, $Job->{knobs}))
760 Log (undef, "knob " . $_);
765 $main::success = undef;
771 my $thisround_succeeded = 0;
772 my $thisround_failed = 0;
773 my $thisround_failed_multiple = 0;
774 my $working_slot_count = scalar(@slot);
776 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
777 or $a <=> $b } @jobstep_todo;
778 my $level = $jobstep[$jobstep_todo[0]]->{level};
780 my $initial_tasks_this_level = 0;
781 foreach my $id (@jobstep_todo) {
782 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
785 # If the number of tasks scheduled at this level #T is smaller than the number
786 # of slots available #S, only use the first #T slots, or the first slot on
787 # each node, whichever number is greater.
789 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
790 # based on these numbers. Using fewer slots makes more resources available
791 # to each individual task, which should normally be a better strategy when
792 # there are fewer of them running with less parallelism.
794 # Note that this calculation is not redone if the initial tasks at
795 # this level queue more tasks at the same level. This may harm
796 # overall task throughput for that level.
798 if ($initial_tasks_this_level < @node) {
799 @freeslot = (0..$#node);
800 } elsif ($initial_tasks_this_level < @slot) {
801 @freeslot = (0..$initial_tasks_this_level - 1);
803 @freeslot = (0..$#slot);
805 my $round_num_freeslots = scalar(@freeslot);
807 my %round_max_slots = ();
808 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
809 my $this_slot = $slot[$freeslot[$ii]];
810 my $node_name = $this_slot->{node}->{name};
811 $round_max_slots{$node_name} ||= $this_slot->{cpu};
812 last if (scalar(keys(%round_max_slots)) >= @node);
815 Log(undef, "start level $level with $round_num_freeslots slots");
818 my $progress_is_dirty = 1;
819 my $progress_stats_updated = 0;
821 update_progress_stats();
825 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
827 # Don't create new tasks if we already know the job's final result.
828 last if defined($main::success);
830 my $id = $jobstep_todo[$todo_ptr];
831 my $Jobstep = $jobstep[$id];
832 if ($Jobstep->{level} != $level)
837 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
838 set_nonblocking($reader{$id});
840 my $childslot = $freeslot[0];
841 my $childnode = $slot[$childslot]->{node};
842 my $childslotname = join (".",
843 $slot[$childslot]->{node}->{name},
844 $slot[$childslot]->{cpu});
846 my $childpid = fork();
849 $SIG{'INT'} = 'DEFAULT';
850 $SIG{'QUIT'} = 'DEFAULT';
851 $SIG{'TERM'} = 'DEFAULT';
853 foreach (values (%reader))
857 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
858 open(STDOUT,">&writer");
859 open(STDERR,">&writer");
864 delete $ENV{"GNUPGHOME"};
865 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
866 $ENV{"TASK_QSEQUENCE"} = $id;
867 $ENV{"TASK_SEQUENCE"} = $level;
868 $ENV{"JOB_SCRIPT"} = $Job->{script};
869 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
870 $param =~ tr/a-z/A-Z/;
871 $ENV{"JOB_PARAMETER_$param"} = $value;
873 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
874 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
875 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
876 $ENV{"HOME"} = $ENV{"TASK_WORK"};
877 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
878 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
879 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
881 my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
887 "--nodelist=".$childnode->{name},
888 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
889 "--job-name=$job_id.$id.$$",
892 my $stdbuf = " stdbuf --output=0 --error=0 ";
894 my $arv_file_cache = "";
895 if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
896 $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
900 "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
901 ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
902 ."&& cd \Q$ENV{CRUNCH_TMP}\E "
903 # These environment variables get used explicitly later in
904 # $command. No tool is expected to read these values directly.
905 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
906 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
907 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
908 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
910 $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
911 $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
912 $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
916 my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
917 my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
918 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
919 $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
920 # We only set memory limits if Docker lets us limit both memory and swap.
921 # Memory limits alone have been supported longer, but subprocesses tend
922 # to get SIGKILL if they exceed that without any swap limit set.
923 # See #5642 for additional background.
924 if ($docker_limitmem) {
925 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
928 # The source tree and $destdir directory (which we have
929 # installed on the worker host) are available in the container,
930 # under the same path.
931 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
932 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
934 # Currently, we make the "by_pdh" directory in arv-mount's mount
935 # point appear at /keep inside the container (instead of using
936 # the same path as the host like we do with CRUNCH_SRC and
937 # CRUNCH_INSTALL). However, crunch scripts and utilities must
938 # not rely on this. They must use $TASK_KEEPMOUNT.
939 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
940 $ENV{TASK_KEEPMOUNT} = "/keep";
942 # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
943 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
944 $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
946 # TASK_WORK is almost exactly like a docker data volume: it
947 # starts out empty, is writable, and persists until no
948 # containers use it any more. We don't use --volumes-from to
949 # share it with other containers: it is only accessible to this
950 # task, and it goes away when this task stops.
952 # However, a docker data volume is writable only by root unless
953 # the mount point already happens to exist in the container with
954 # different permissions. Therefore, we [1] assume /tmp already
955 # exists in the image and is writable by the crunch user; [2]
956 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
957 # writable if they are created by docker while setting up the
958 # other --volumes); and [3] create $TASK_WORK inside the
959 # container using $build_script.
960 $command .= "--volume=/tmp ";
961 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
962 $ENV{"HOME"} = $ENV{"TASK_WORK"};
963 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
965 # TODO: Share a single JOB_WORK volume across all task
966 # containers on a given worker node, and delete it when the job
967 # ends (and, in case that doesn't work, when the next job
970 # For now, use the same approach as TASK_WORK above.
971 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
973 while (my ($env_key, $env_val) = each %ENV)
975 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
976 $command .= "--env=\Q$env_key=$env_val\E ";
979 $command .= "--env=\QHOME=$ENV{HOME}\E ";
980 $command .= "\Q$docker_hash\E ";
982 if ($Job->{arvados_sdk_version}) {
984 $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
986 $command .= "/bin/sh -c \'python -c " .
987 '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
988 ">&2 2>/dev/null; " .
989 "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
990 "if which stdbuf >/dev/null ; then " .
991 " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
993 " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
998 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
1000 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
1003 my @execargs = ('bash', '-c', $command);
1004 srun (\@srunargs, \@execargs, undef, $build_script);
1005 # exec() failed, we assume nothing happened.
1006 die "srun() failed on build script\n";
1009 if (!defined $childpid)
1012 delete $reader{$id};
1016 $proc{$childpid} = { jobstep => $id,
1019 jobstepname => "$job_id.$id.$childpid",
1021 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1022 $slot[$childslot]->{pid} = $childpid;
1024 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1025 Log ($id, "child $childpid started on $childslotname");
1026 $Jobstep->{starttime} = time;
1027 $Jobstep->{node} = $childnode->{name};
1028 $Jobstep->{slotindex} = $childslot;
1029 delete $Jobstep->{stderr};
1030 delete $Jobstep->{finishtime};
1031 delete $Jobstep->{tempfail};
1033 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1034 $Jobstep->{'arvados_task'}->save;
1036 splice @jobstep_todo, $todo_ptr, 1;
1039 $progress_is_dirty = 1;
1043 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1045 last THISROUND if $main::please_freeze;
1046 if ($main::please_info)
1048 $main::please_info = 0;
1050 create_output_collection();
1052 update_progress_stats();
1057 if (!$gotsome || ($latest_refresh + 2 < scalar time))
1059 check_refresh_wanted();
1061 update_progress_stats();
1063 elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1065 update_progress_stats();
1068 select (undef, undef, undef, 0.1);
1070 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1071 $_->{node}->{hold_count} < 4 } @slot);
1072 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1073 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1075 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1076 .($thisround_failed+$thisround_succeeded)
1077 .") -- giving up on this round";
1078 Log (undef, $message);
1082 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1083 for (my $i=$#freeslot; $i>=0; $i--) {
1084 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1085 push @holdslot, (splice @freeslot, $i, 1);
1088 for (my $i=$#holdslot; $i>=0; $i--) {
1089 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1090 push @freeslot, (splice @holdslot, $i, 1);
1094 # give up if no nodes are succeeding
1095 if ($working_slot_count < 1) {
1096 Log(undef, "Every node has failed -- giving up");
1103 push @freeslot, splice @holdslot;
1104 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1107 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1110 if ($main::please_continue) {
1111 $main::please_continue = 0;
1114 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1116 if (!reapchildren())
1118 check_refresh_wanted();
1120 update_progress_stats();
1121 select (undef, undef, undef, 0.1);
1122 killem (keys %proc) if $main::please_freeze;
1126 update_progress_stats();
1127 freeze_if_want_freeze();
1130 if (!defined $main::success)
1132 if (!@jobstep_todo) {
1134 } elsif ($working_slot_count < 1) {
1135 save_output_collection();
1137 exit(EX_RETRY_UNLOCKED);
1138 } elsif ($thisround_succeeded == 0 &&
1139 ($thisround_failed == 0 || $thisround_failed > 4)) {
1140 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1141 Log (undef, $message);
1146 goto ONELEVEL if !defined $main::success;
1149 release_allocation();
1151 my $collated_output = save_output_collection();
1152 Log (undef, "finish");
1157 if ($collated_output && $main::success) {
1158 $final_state = 'Complete';
1160 $final_state = 'Failed';
1162 $Job->update_attributes('state' => $final_state);
1164 exit (($final_state eq 'Complete') ? 0 : 1);
1168 sub update_progress_stats
1170 $progress_stats_updated = time;
1171 return if !$progress_is_dirty;
1172 my ($todo, $done, $running) = (scalar @jobstep_todo,
1173 scalar @jobstep_done,
1174 scalar keys(%proc));
1175 $Job->{'tasks_summary'} ||= {};
1176 $Job->{'tasks_summary'}->{'todo'} = $todo;
1177 $Job->{'tasks_summary'}->{'done'} = $done;
1178 $Job->{'tasks_summary'}->{'running'} = $running;
1179 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1180 Log (undef, "status: $done done, $running running, $todo todo");
1181 $progress_is_dirty = 0;
1188 my $pid = waitpid (-1, WNOHANG);
1189 return 0 if $pid <= 0;
1191 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1193 . $slot[$proc{$pid}->{slot}]->{cpu});
1194 my $jobstepid = $proc{$pid}->{jobstep};
1195 my $elapsed = time - $proc{$pid}->{time};
1196 my $Jobstep = $jobstep[$jobstepid];
1198 my $childstatus = $?;
1199 my $exitvalue = $childstatus >> 8;
1200 my $exitinfo = "exit ".exit_status_s($childstatus);
1201 $Jobstep->{'arvados_task'}->reload;
1202 my $task_success = $Jobstep->{'arvados_task'}->{success};
1204 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1206 if (!defined $task_success) {
1207 # task did not indicate one way or the other --> fail
1208 Log($jobstepid, sprintf(
1209 "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1210 exit_status_s($childstatus)));
1211 $Jobstep->{'arvados_task'}->{success} = 0;
1212 $Jobstep->{'arvados_task'}->save;
1219 $temporary_fail ||= $Jobstep->{tempfail};
1220 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1222 ++$thisround_failed;
1223 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1225 # Check for signs of a failed or misconfigured node
1226 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1227 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1228 # Don't count this against jobstep failure thresholds if this
1229 # node is already suspected faulty and srun exited quickly
1230 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1232 Log ($jobstepid, "blaming failure on suspect node " .
1233 $slot[$proc{$pid}->{slot}]->{node}->{name});
1234 $temporary_fail ||= 1;
1236 ban_node_by_slot($proc{$pid}->{slot});
1239 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1240 ++$Jobstep->{'failures'},
1241 $temporary_fail ? 'temporary' : 'permanent',
1244 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1245 # Give up on this task, and the whole job
1248 # Put this task back on the todo queue
1249 push @jobstep_todo, $jobstepid;
1250 $Job->{'tasks_summary'}->{'failed'}++;
1254 ++$thisround_succeeded;
1255 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1256 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1257 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1258 push @jobstep_done, $jobstepid;
1259 Log ($jobstepid, "success in $elapsed seconds");
1261 $Jobstep->{exitcode} = $childstatus;
1262 $Jobstep->{finishtime} = time;
1263 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1264 $Jobstep->{'arvados_task'}->save;
1265 process_stderr ($jobstepid, $task_success);
1266 Log ($jobstepid, sprintf("task output (%d bytes): %s",
1267 length($Jobstep->{'arvados_task'}->{output}),
1268 $Jobstep->{'arvados_task'}->{output}));
1270 close $reader{$jobstepid};
1271 delete $reader{$jobstepid};
1272 delete $slot[$proc{$pid}->{slot}]->{pid};
1273 push @freeslot, $proc{$pid}->{slot};
1276 if ($task_success) {
1278 my $newtask_list = [];
1279 my $newtask_results;
1281 $newtask_results = api_call(
1284 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1286 'order' => 'qsequence',
1287 'offset' => scalar(@$newtask_list),
1289 push(@$newtask_list, @{$newtask_results->{items}});
1290 } while (@{$newtask_results->{items}});
1291 foreach my $arvados_task (@$newtask_list) {
1293 'level' => $arvados_task->{'sequence'},
1295 'arvados_task' => $arvados_task
1297 push @jobstep, $jobstep;
1298 push @jobstep_todo, $#jobstep;
1302 $progress_is_dirty = 1;
1306 sub check_refresh_wanted
1308 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1309 if (@stat && $stat[9] > $latest_refresh) {
1310 $latest_refresh = scalar time;
1311 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1312 for my $attr ('cancelled_at',
1313 'cancelled_by_user_uuid',
1314 'cancelled_by_client_uuid',
1316 $Job->{$attr} = $Job2->{$attr};
1318 if ($Job->{'state'} ne "Running") {
1319 if ($Job->{'state'} eq "Cancelled") {
1320 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1322 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1325 $main::please_freeze = 1;
1332 my $last_squeue_check = $squeue_checked;
1334 # Do not call `squeue` or check the kill list more than once every
1336 return if $last_squeue_check > time - 15;
1337 $squeue_checked = time;
1339 # Look for children from which we haven't received stderr data since
1340 # the last squeue check. If no such children exist, all procs are
1341 # alive and there's no need to even look at squeue.
1343 # As long as the crunchstat poll interval (10s) is shorter than the
1344 # squeue check interval (15s) this should make the squeue check an
1346 my $silent_procs = 0;
1347 for my $procinfo (values %proc)
1349 my $jobstep = $jobstep[$procinfo->{jobstep}];
1350 if ($jobstep->{stderr_at} < $last_squeue_check)
1355 return if $silent_procs == 0;
1357 # use killem() on procs whose killtime is reached
1358 while (my ($pid, $procinfo) = each %proc)
1360 my $jobstep = $jobstep[$procinfo->{jobstep}];
1361 if (exists $procinfo->{killtime}
1362 && $procinfo->{killtime} <= time
1363 && $jobstep->{stderr_at} < $last_squeue_check)
1366 if ($jobstep->{stderr_at}) {
1367 $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1369 Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1376 # here is an opportunity to check for mysterious problems with local procs
1380 # Get a list of steps still running. Note: squeue(1) says --steps
1381 # selects a format (which we override anyway) and allows us to
1382 # specify which steps we're interested in (which we don't).
1383 # Importantly, it also changes the meaning of %j from "job name" to
1384 # "step name" and (although this isn't mentioned explicitly in the
1385 # docs) switches from "one line per job" mode to "one line per step"
1386 # mode. Without it, we'd just get a list of one job, instead of a
1388 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1391 Log(undef, "warning: squeue exit status $? ($!)");
1396 # which of my jobsteps are running, according to squeue?
1398 for my $jobstepname (@squeue)
1400 $ok{$jobstepname} = 1;
1403 # Check for child procs >60s old and not mentioned by squeue.
1404 while (my ($pid, $procinfo) = each %proc)
1406 if ($procinfo->{time} < time - 60
1407 && $procinfo->{jobstepname}
1408 && !exists $ok{$procinfo->{jobstepname}}
1409 && !exists $procinfo->{killtime})
1411 # According to slurm, this task has ended (successfully or not)
1412 # -- but our srun child hasn't exited. First we must wait (30
1413 # seconds) in case this is just a race between communication
1414 # channels. Then, if our srun child process still hasn't
1415 # terminated, we'll conclude some slurm communication
1416 # error/delay has caused the task to die without notifying srun,
1417 # and we'll kill srun ourselves.
1418 $procinfo->{killtime} = time + 30;
1419 Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1425 sub release_allocation
1429 Log (undef, "release job allocation");
1430 system "scancel $ENV{SLURM_JOB_ID}";
1438 foreach my $job (keys %reader)
1441 if (0 < sysread ($reader{$job}, $buf, 65536))
1443 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1444 $jobstep[$job]->{stderr_at} = time;
1445 $jobstep[$job]->{stderr} .= $buf;
1447 # Consume everything up to the last \n
1448 preprocess_stderr ($job);
1450 if (length ($jobstep[$job]->{stderr}) > 16384)
1452 # If we get a lot of stderr without a newline, chop off the
1453 # front to avoid letting our buffer grow indefinitely.
1454 substr ($jobstep[$job]->{stderr},
1455 0, length($jobstep[$job]->{stderr}) - 8192) = "";
1464 sub preprocess_stderr
1468 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1470 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1471 Log ($job, "stderr $line");
1472 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1474 $main::please_freeze = 1;
1476 elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1477 my $job_slot_index = $jobstep[$job]->{slotindex};
1478 $slot[$job_slot_index]->{node}->{fail_count}++;
1479 $jobstep[$job]->{tempfail} = 1;
1480 ban_node_by_slot($job_slot_index);
1482 elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1483 $jobstep[$job]->{tempfail} = 1;
1484 ban_node_by_slot($jobstep[$job]->{slotindex});
1486 elsif ($line =~ /arvados\.errors\.Keep/) {
1487 $jobstep[$job]->{tempfail} = 1;
1496 my $task_success = shift;
1497 preprocess_stderr ($job);
1500 Log ($job, "stderr $_");
1501 } split ("\n", $jobstep[$job]->{stderr});
1508 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1509 Log(undef, "fetch_block run error from arv-get $hash: $!");
1512 my $output_block = "";
1515 my $bytes = sysread($keep, $buf, 1024 * 1024);
1516 if (!defined $bytes) {
1517 Log(undef, "fetch_block read error from arv-get: $!");
1518 $output_block = undef;
1520 } elsif ($bytes == 0) {
1521 # sysread returns 0 at the end of the pipe.
1524 # some bytes were read into buf.
1525 $output_block .= $buf;
1530 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1531 $output_block = undef;
1533 return $output_block;
1536 # Create a collection by concatenating the output of all tasks (each
1537 # task's output is either a manifest fragment, a locator for a
1538 # manifest fragment stored in Keep, or nothing at all). Return the
1539 # portable_data_hash of the new collection.
1540 sub create_output_collection
1542 Log (undef, "collate");
1544 my ($child_out, $child_in);
1545 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1548 print (arvados.api("v1").collections().
1549 create(body={"manifest_text": sys.stdin.read()}).
1550 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1554 my $manifest_size = 0;
1558 my $output = $_->{'arvados_task'}->{output};
1559 next if (!defined($output));
1561 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1562 $next_write = fetch_block($output);
1564 $next_write = $output;
1566 if (defined($next_write)) {
1567 if (!defined(syswrite($child_in, $next_write))) {
1568 # There's been an error writing. Stop the loop.
1569 # We'll log details about the exit code later.
1572 $manifest_size += length($next_write);
1575 my $uuid = $_->{'arvados_task'}->{'uuid'};
1576 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1581 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1584 my $s = IO::Select->new($child_out);
1585 if ($s->can_read(120)) {
1586 sysread($child_out, $joboutput, 1024 * 1024);
1589 Log(undef, "output collection creation exited " . exit_status_s($?));
1595 Log (undef, "timed out while creating output collection");
1596 foreach my $signal (2, 2, 2, 15, 15, 9) {
1597 kill($signal, $pid);
1598 last if waitpid($pid, WNOHANG) == -1;
1607 # Calls create_output_collection, logs the result, and returns it.
1608 # If that was successful, save that as the output in the job record.
1609 sub save_output_collection {
1610 my $collated_output = create_output_collection();
1612 if (!$collated_output) {
1613 Log(undef, "Failed to write output collection");
1616 Log(undef, "job output $collated_output");
1617 $Job->update_attributes('output' => $collated_output);
1619 return $collated_output;
1626 my $sig = 2; # SIGINT first
1627 if (exists $proc{$_}->{"sent_$sig"} &&
1628 time - $proc{$_}->{"sent_$sig"} > 4)
1630 $sig = 15; # SIGTERM if SIGINT doesn't work
1632 if (exists $proc{$_}->{"sent_$sig"} &&
1633 time - $proc{$_}->{"sent_$sig"} > 4)
1635 $sig = 9; # SIGKILL if SIGTERM doesn't work
1637 if (!exists $proc{$_}->{"sent_$sig"})
1639 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1641 select (undef, undef, undef, 0.1);
1644 kill $sig, $_; # srun wants two SIGINT to really interrupt
1646 $proc{$_}->{"sent_$sig"} = time;
1647 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1657 vec($bits,fileno($_),1) = 1;
1663 # Send log output to Keep via arv-put.
1665 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1666 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1667 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1668 # $log_pipe_pid is the pid of the arv-put subprocess.
1670 # The only functions that should access these variables directly are:
1672 # log_writer_start($logfilename)
1673 # Starts an arv-put pipe, reading data on stdin and writing it to
1674 # a $logfilename file in an output collection.
1676 # log_writer_read_output([$timeout])
1677 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1678 # Passes $timeout to the select() call, with a default of 0.01.
1679 # Returns the result of the last read() call on $log_pipe_out, or
1680 # -1 if read() wasn't called because select() timed out.
1681 # Only other log_writer_* functions should need to call this.
1683 # log_writer_send($txt)
1684 # Writes $txt to the output log collection.
1686 # log_writer_finish()
1687 # Closes the arv-put pipe and returns the output that it produces.
1689 # log_writer_is_active()
1690 # Returns a true value if there is currently a live arv-put
1691 # process, false otherwise.
1693 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1696 sub log_writer_start($)
1698 my $logfilename = shift;
1699 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1703 '--filename', $logfilename,
1705 $log_pipe_out_buf = "";
1706 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1709 sub log_writer_read_output {
1710 my $timeout = shift || 0.01;
1712 while ($read && $log_pipe_out_select->can_read($timeout)) {
1713 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1714 length($log_pipe_out_buf));
1716 if (!defined($read)) {
1717 Log(undef, "error reading log manifest from arv-put: $!");
1722 sub log_writer_send($)
1725 print $log_pipe_in $txt;
1726 log_writer_read_output();
1729 sub log_writer_finish()
1731 return unless $log_pipe_pid;
1733 close($log_pipe_in);
1735 my $logger_failed = 0;
1736 my $read_result = log_writer_read_output(120);
1737 if ($read_result == -1) {
1738 $logger_failed = -1;
1739 Log (undef, "timed out reading from 'arv-put'");
1740 } elsif ($read_result != 0) {
1741 $logger_failed = -2;
1742 Log(undef, "failed to read arv-put log manifest to EOF");
1745 waitpid($log_pipe_pid, 0);
1747 $logger_failed ||= $?;
1748 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1751 close($log_pipe_out);
1752 my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1753 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1754 $log_pipe_out_select = undef;
1756 return $arv_put_output;
1759 sub log_writer_is_active() {
1760 return $log_pipe_pid;
1763 sub Log # ($jobstep_id, $logmessage)
1765 if ($_[1] =~ /\n/) {
1766 for my $line (split (/\n/, $_[1])) {
1771 my $fh = select STDERR; $|=1; select $fh;
1772 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1773 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1776 if (log_writer_is_active() || -t STDERR) {
1777 my @gmtime = gmtime;
1778 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1779 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1781 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1783 if (log_writer_is_active()) {
1784 log_writer_send($datetime . " " . $message);
1791 my ($package, $file, $line) = caller;
1792 my $message = "@_ at $file line $line\n";
1793 Log (undef, $message);
1794 freeze() if @jobstep_todo;
1795 create_output_collection() if @jobstep_todo;
1805 if ($Job->{'state'} eq 'Cancelled') {
1806 $Job->update_attributes('finished_at' => scalar gmtime);
1808 $Job->update_attributes('state' => 'Failed');
1815 my $justcheckpoint = shift; # false if this will be the last meta saved
1816 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1817 return unless log_writer_is_active();
1818 my $log_manifest = log_writer_finish();
1819 return unless defined($log_manifest);
1822 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1823 $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1826 my $log_coll = api_call(
1827 "collections/create", ensure_unique_name => 1, collection => {
1828 manifest_text => $log_manifest,
1829 owner_uuid => $Job->{owner_uuid},
1830 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1832 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1833 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1837 sub freeze_if_want_freeze
1839 if ($main::please_freeze)
1841 release_allocation();
1844 # kill some srun procs before freeze+stop
1845 map { $proc{$_} = {} } @_;
1848 killem (keys %proc);
1849 select (undef, undef, undef, 0.1);
1851 while (($died = waitpid (-1, WNOHANG)) > 0)
1853 delete $proc{$died};
1858 create_output_collection();
1868 Log (undef, "Freeze not implemented");
1875 croak ("Thaw not implemented");
1891 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1898 my $srunargs = shift;
1899 my $execargs = shift;
1900 my $opts = shift || {};
1902 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1904 $Data::Dumper::Terse = 1;
1905 $Data::Dumper::Indent = 0;
1906 my $show_cmd = Dumper($args);
1907 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1908 $show_cmd =~ s/\n/ /g;
1909 if ($opts->{fork}) {
1910 Log(undef, "starting: $show_cmd");
1912 # This is a child process: parent is in charge of reading our
1913 # stderr and copying it to Log() if needed.
1914 warn "starting: $show_cmd\n";
1917 if (defined $stdin) {
1918 my $child = open STDIN, "-|";
1919 defined $child or die "no fork: $!";
1921 print $stdin or die $!;
1922 close STDOUT or die $!;
1927 return system (@$args) if $opts->{fork};
1930 warn "ENV size is ".length(join(" ",%ENV));
1931 die "exec failed: $!: @$args";
1935 sub ban_node_by_slot {
1936 # Don't start any new jobsteps on this node for 60 seconds
1938 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1939 $slot[$slotid]->{node}->{hold_count}++;
1940 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1945 my ($lockfile, $error_message) = @_;
1946 open L, ">", $lockfile or croak("$lockfile: $!");
1947 if (!flock L, LOCK_EX|LOCK_NB) {
1948 croak("Can't lock $lockfile: $error_message\n");
1952 sub find_docker_image {
1953 # Given a Keep locator, check to see if it contains a Docker image.
1954 # If so, return its stream name and Docker hash.
1955 # If not, return undef for both values.
1956 my $locator = shift;
1957 my ($streamname, $filename);
1958 my $image = api_call("collections/get", uuid => $locator);
1960 foreach my $line (split(/\n/, $image->{manifest_text})) {
1961 my @tokens = split(/\s+/, $line);
1963 $streamname = shift(@tokens);
1964 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1965 if (defined($filename)) {
1966 return (undef, undef); # More than one file in the Collection.
1968 $filename = (split(/:/, $filedata, 3))[2];
1973 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1974 return ($streamname, $1);
1976 return (undef, undef);
1981 # Calculate the number of times an operation should be retried,
1982 # assuming exponential backoff, and that we're willing to retry as
1983 # long as tasks have been running. Enforce a minimum of 3 retries.
1984 my ($starttime, $endtime, $timediff, $retries);
1986 $starttime = $jobstep[0]->{starttime};
1987 $endtime = $jobstep[-1]->{finishtime};
1989 if (!defined($starttime)) {
1991 } elsif (!defined($endtime)) {
1992 $timediff = time - $starttime;
1994 $timediff = ($endtime - $starttime) - (time - $endtime);
1996 if ($timediff > 0) {
1997 $retries = int(log($timediff) / log(2));
1999 $retries = 1; # Use the minimum.
2001 return ($retries > 3) ? $retries : 3;
2005 # Pass in two function references.
2006 # This method will be called with the remaining arguments.
2007 # If it dies, retry it with exponential backoff until it succeeds,
2008 # or until the current retry_count is exhausted. After each failure
2009 # that can be retried, the second function will be called with
2010 # the current try count (0-based), next try time, and error message.
2011 my $operation = shift;
2012 my $retry_callback = shift;
2013 my $retries = retry_count();
2014 foreach my $try_count (0..$retries) {
2015 my $next_try = time + (2 ** $try_count);
2016 my $result = eval { $operation->(@_); };
2019 } elsif ($try_count < $retries) {
2020 $retry_callback->($try_count, $next_try, $@);
2021 my $sleep_time = $next_try - time;
2022 sleep($sleep_time) if ($sleep_time > 0);
2025 # Ensure the error message ends in a newline, so Perl doesn't add
2026 # retry_op's line number to it.
2032 # Pass in a /-separated API method name, and arguments for it.
2033 # This function will call that method, retrying as needed until
2034 # the current retry_count is exhausted, with a log on the first failure.
2035 my $method_name = shift;
2036 my $log_api_retry = sub {
2037 my ($try_count, $next_try_at, $errmsg) = @_;
2038 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2039 $errmsg =~ s/\s/ /g;
2040 $errmsg =~ s/\s+$//;
2042 if ($next_try_at < time) {
2043 $retry_msg = "Retrying.";
2045 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2046 $retry_msg = "Retrying at $next_try_fmt.";
2048 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2051 foreach my $key (split(/\//, $method_name)) {
2052 $method = $method->{$key};
2054 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2058 # Given a $?, return a human-readable exit code string like "0" or
2059 # "1" or "0 with signal 1" or "1 with signal 11".
2060 my $exitcode = shift;
2061 my $s = $exitcode >> 8;
2062 if ($exitcode & 0x7f) {
2063 $s .= " with signal " . ($exitcode & 0x7f);
2065 if ($exitcode & 0x80) {
2066 $s .= " with core dump";
2071 sub handle_readall {
2072 # Pass in a glob reference to a file handle.
2073 # Read all its contents and return them as a string.
2074 my $fh_glob_ref = shift;
2076 return <$fh_glob_ref>;
2079 sub tar_filename_n {
2081 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2084 sub add_git_archive {
2085 # Pass in a git archive command as a string or list, a la system().
2086 # This method will save its output to be included in the archive sent to the
2090 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2091 croak("Failed to save git archive: $!");
2093 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2095 waitpid($git_pid, 0);
2098 croak("Failed to save git archive: git exited " . exit_status_s($?));
2102 sub combined_git_archive {
2103 # Combine all saved tar archives into a single archive, then return its
2104 # contents in a string. Return undef if no archives have been saved.
2105 if ($git_tar_count < 1) {
2108 my $base_tar_name = tar_filename_n(1);
2109 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2110 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2111 if ($tar_exit != 0) {
2112 croak("Error preparing build archive: tar -A exited " .
2113 exit_status_s($tar_exit));
2116 if (!open(GIT_TAR, "<", $base_tar_name)) {
2117 croak("Could not open build archive: $!");
2119 my $tar_contents = handle_readall(\*GIT_TAR);
2121 return $tar_contents;
2124 sub set_nonblocking {
2126 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2127 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2133 # This is crunch-job's internal dispatch script. crunch-job running on the API
2134 # server invokes this script on individual compute nodes, or localhost if we're
2135 # running a job locally. It gets called in two modes:
2137 # * No arguments: Installation mode. Read a tar archive from the DATA
2138 # file handle; it includes the Crunch script's source code, and
2139 # maybe SDKs as well. Those should be installed in the proper
2140 # locations. This runs outside of any Docker container, so don't try to
2141 # introspect Crunch's runtime environment.
2143 # * With arguments: Crunch script run mode. This script should set up the
2144 # environment, then run the command specified in the arguments. This runs
2145 # inside any Docker container.
2148 use File::Path qw( make_path remove_tree );
2149 use POSIX qw(getcwd);
2151 use constant TASK_TEMPFAIL => 111;
2153 # Map SDK subdirectories to the path environments they belong to.
2154 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2156 my $destdir = $ENV{"CRUNCH_SRC"};
2157 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2158 my $repo = $ENV{"CRUNCH_SRC_URL"};
2159 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2160 my $job_work = $ENV{"JOB_WORK"};
2161 my $task_work = $ENV{"TASK_WORK"};
2163 open(STDOUT_ORIG, ">&", STDOUT);
2164 open(STDERR_ORIG, ">&", STDERR);
2166 for my $dir ($destdir, $job_work, $task_work) {
2169 -e $dir or die "Failed to create temporary directory ($dir): $!";
2174 remove_tree($task_work, {keep_root => 1});
2177 ### Crunch script run mode
2179 # We want to do routine logging during task 0 only. This gives the user
2180 # the information they need, but avoids repeating the information for every
2183 if ($ENV{TASK_SEQUENCE} eq "0") {
2186 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2192 my $python_src = "$install_dir/python";
2193 my $venv_dir = "$job_work/.arvados.venv";
2194 my $venv_built = -e "$venv_dir/bin/activate";
2195 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2196 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2197 "--python=python2.7", $venv_dir);
2198 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2200 $Log->("Built Python SDK virtualenv");
2203 my @pysdk_version_cmd = ("python", "-c",
2204 "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2206 $Log->("Running in Python SDK virtualenv");
2207 @pysdk_version_cmd = ();
2208 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2209 @ARGV = ("/bin/sh", "-ec",
2210 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2211 } elsif (-d $python_src) {
2212 $Log->("Warning: virtualenv not found inside Docker container default " .
2213 "\$PATH. Can't install Python SDK.");
2216 if (@pysdk_version_cmd) {
2217 open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2218 my $pysdk_version = <$pysdk_version_pipe>;
2219 close($pysdk_version_pipe);
2221 chomp($pysdk_version);
2222 $Log->("Using Arvados SDK version $pysdk_version");
2224 # A lot could've gone wrong here, but pretty much all of it means that
2225 # Python won't be able to load the Arvados SDK.
2226 $Log->("Warning: Arvados SDK not found");
2230 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2231 my $sdk_path = "$install_dir/$sdk_dir";
2233 if ($ENV{$sdk_envkey}) {
2234 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2236 $ENV{$sdk_envkey} = $sdk_path;
2238 $Log->("Arvados SDK added to %s", $sdk_envkey);
2243 die "Cannot exec `@ARGV`: $!";
2246 ### Installation mode
2247 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2249 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2250 # This exact git archive (source + arvados sdk) is already installed
2251 # here, so there's no need to reinstall it.
2253 # We must consume our DATA section, though: otherwise the process
2254 # feeding it to us will get SIGPIPE.
2256 while (read(DATA, $buf, 65536)) { }
2261 unlink "$destdir.archive_hash";
2265 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2266 local $SIG{PIPE} = "IGNORE";
2267 warn "Extracting archive: $archive_hash\n";
2268 # --ignore-zeros is necessary sometimes: depending on how much NUL
2269 # padding tar -A put on our combined archive (which in turn depends
2270 # on the length of the component archives) tar without
2271 # --ignore-zeros will exit before consuming stdin and cause close()
2272 # to fail on the resulting SIGPIPE.
2273 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2274 die "Error launching 'tar -xC $destdir': $!";
2276 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2277 # get SIGPIPE. We must feed it data incrementally.
2279 while (read(DATA, $tar_input, 65536)) {
2280 print TARX $tar_input;
2283 die "'tar -xC $destdir' exited $?: $!";
2289 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2291 foreach my $sdk_lang (("python",
2292 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2293 if (-d "$sdk_root/$sdk_lang") {
2294 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2295 die "Failed to install $sdk_lang SDK: $!";
2301 my $python_dir = "$install_dir/python";
2302 if ((-d $python_dir) and can_run("python2.7")) {
2303 open(my $egg_info_pipe, "-|",
2304 "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2305 my @egg_info_errors = <$egg_info_pipe>;
2306 close($egg_info_pipe);
2309 if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2310 # egg_info apparently failed because it couldn't ask git for a build tag.
2311 # Specify no build tag.
2312 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2313 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2316 my $egg_info_exit = $? >> 8;
2317 foreach my $errline (@egg_info_errors) {
2320 warn "python setup.py egg_info failed: exit $egg_info_exit";
2321 exit ($egg_info_exit || 1);
2326 # Hide messages from the install script (unless it fails: shell_or_die
2327 # will show $destdir.log in that case).
2328 open(STDOUT, ">>", "$destdir.log");
2329 open(STDERR, ">&", STDOUT);
2331 if (-e "$destdir/crunch_scripts/install") {
2332 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2333 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2335 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2336 } elsif (-e "./install.sh") {
2337 shell_or_die (undef, "./install.sh", $install_dir);
2340 if ($archive_hash) {
2341 unlink "$destdir.archive_hash.new";
2342 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2343 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2349 my $command_name = shift;
2350 open(my $which, "-|", "which", $command_name);
2351 while (<$which>) { }
2358 my $exitcode = shift;
2360 if ($ENV{"DEBUG"}) {
2361 print STDERR "@_\n";
2363 if (system (@_) != 0) {
2366 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2367 open STDERR, ">&STDERR_ORIG";
2368 system ("cat $destdir.log >&2");
2369 warn "@_ failed ($err): $exitstatus";
2370 if (defined($exitcode)) {
2374 exit (($code >> 8) || 1);