Merge branch '6146-job-runtime-sanity' refs #6146
[arvados.git] / sdk / cli / bin / crunch-job
index 554a1431ee291469f7b46cfbaa24e33c435dcc5c..5414033584e676bee7004f1edcc27b97665a6edb 100755 (executable)
@@ -96,6 +96,7 @@ use File::Temp;
 use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
 
+use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
 
 $ENV{"TMPDIR"} ||= "/tmp";
@@ -117,18 +118,21 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
+my %proc;
 my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $docker_bin = "/usr/bin/docker.io";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'docker-bin=s' => \$docker_bin,
     );
 
 if (defined $job_api_token) {
@@ -136,7 +140,6 @@ if (defined $job_api_token) {
 }
 
 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
 
 
 $SIG{'USR1'} = sub
@@ -148,8 +151,6 @@ $SIG{'USR2'} = sub
   $main::ENV{CRUNCH_DEBUG} = 0;
 };
 
-
-
 my $arv = Arvados->new('apiVersion' => 'v1');
 
 my $Job;
@@ -158,12 +159,41 @@ my $dbh;
 my $sth;
 my @jobstep;
 
-my $User = api_call("users/current");
-
+my $local_job;
 if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
   $Job = api_call("jobs/get", uuid => $jobspec);
+  $local_job = 0;
+}
+else
+{
+  $Job = JSON::decode_json($jobspec);
+  $local_job = 1;
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if ($Job->{docker_image_locator}) {
+  $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+     $cmd,
+     {fork => 1});
+if ($? != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($?));
+  exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
   if (!$force_unlock) {
     # Claim this job, and make sure nobody else does
     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
@@ -175,8 +205,6 @@ if ($jobspec =~ /^[-a-z\d]+$/)
 }
 else
 {
-  $Job = JSON::decode_json($jobspec);
-
   if (!$resume_stash)
   {
     map { croak ("No $_ specified") unless $Job->{$_} }
@@ -361,7 +389,7 @@ if (!defined $no_clear_tmp) {
     # TODO: When #5036 is done and widely deployed, we can get rid of the
     # regular expression and just unmount everything with type fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
     exit (1);
   }
   while (1)
@@ -374,8 +402,7 @@ if (!defined $no_clear_tmp) {
 }
 
 # If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
@@ -384,7 +411,7 @@ if ($docker_locator = $Job->{docker_image_locator}) {
   }
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
@@ -407,6 +434,12 @@ fi
           .exit_status_s($?));
   }
 
+  # Determine whether this version of Docker supports memory+swap limits.
+  srun(["srun", "--nodelist=" . $node[0]],
+       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
+      {fork => 1});
+  $docker_limitmem = ($? == 0);
+
   if ($Job->{arvados_sdk_version}) {
     # The job also specifies an Arvados SDK version.  Add the SDKs to the
     # tar file for the build script to install.
@@ -582,32 +615,89 @@ if (!defined $git_archive) {
   }
 }
 else {
-  Log(undef, "Run install script on all workers");
-
-  my @srunargs = ("srun",
-                  "--nodelist=$nodelist",
-                  "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-  my @execargs = ("sh", "-c",
-                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+  my $install_exited;
+  my $install_script_tries_left = 3;
+  for (my $attempts = 0; $attempts < 3; $attempts++) {
+    Log(undef, "Run install script on all workers");
+
+    my @srunargs = ("srun",
+                    "--nodelist=$nodelist",
+                    "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+    my @execargs = ("sh", "-c",
+                    "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+    $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+    my ($install_stderr_r, $install_stderr_w);
+    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+    set_nonblocking($install_stderr_r);
+    my $installpid = fork();
+    if ($installpid == 0)
+    {
+      close($install_stderr_r);
+      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+      open(STDOUT, ">&", $install_stderr_w);
+      open(STDERR, ">&", $install_stderr_w);
+      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+      exit (1);
+    }
+    close($install_stderr_w);
+    # Tell freeze_if_want_freeze how to kill the child, otherwise the
+    # "waitpid(installpid)" loop won't get interrupted by a freeze:
+    $proc{$installpid} = {};
+    my $stderr_buf = '';
+    # Track whether anything appears on stderr other than slurm errors
+    # ("srun: ...") and the "starting: ..." message printed by the
+    # srun subroutine itself:
+    my $stderr_anything_from_script = 0;
+    my $match_our_own_errors = '^(srun: error: |starting: \[)';
+    while ($installpid != waitpid(-1, WNOHANG)) {
+      freeze_if_want_freeze ($installpid);
+      # Wait up to 0.1 seconds for something to appear on stderr, then
+      # do a non-blocking read.
+      my $bits = fhbits($install_stderr_r);
+      select ($bits, undef, $bits, 0.1);
+      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+      {
+        while ($stderr_buf =~ /^(.*?)\n/) {
+          my $line = $1;
+          substr $stderr_buf, 0, 1+length($line), "";
+          Log(undef, "stderr $line");
+          if ($line !~ /$match_our_own_errors/) {
+            $stderr_anything_from_script = 1;
+          }
+        }
+      }
+    }
+    delete $proc{$installpid};
+    $install_exited = $?;
+    close($install_stderr_r);
+    if (length($stderr_buf) > 0) {
+      if ($stderr_buf !~ /$match_our_own_errors/) {
+        $stderr_anything_from_script = 1;
+      }
+      Log(undef, "stderr $stderr_buf")
+    }
 
-  my $installpid = fork();
-  if ($installpid == 0)
-  {
-    srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $installpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($installpid);
-    select (undef, undef, undef, 0.1);
+    Log (undef, "Install script exited ".exit_status_s($install_exited));
+    last if $install_exited == 0 || $main::please_freeze;
+    # If the install script fails but doesn't print an error message,
+    # the next thing anyone is likely to do is just run it again in
+    # case it was a transient problem like "slurm communication fails
+    # because the network isn't reliable enough". So we'll just do
+    # that ourselves (up to 3 attempts in total). OTOH, if there is an
+    # error message, the problem is more likely to have a real fix and
+    # we should fail the job so the fixing process can start, instead
+    # of doing 2 more attempts.
+    last if $stderr_anything_from_script;
   }
-  my $install_exited = $?;
-  Log (undef, "Install script exited ".exit_status_s($install_exited));
+
   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
     unlink($tar_filename);
   }
-  exit (1) if $install_exited != 0;
+
+  if ($install_exited != 0) {
+    croak("Giving up");
+  }
 }
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -636,12 +726,43 @@ my $thisround_failed_multiple = 0;
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
 my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
 
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+  $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
+
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers.  Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level.  This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+  @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+  @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+  @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
 
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+  my $this_slot = $slot[$freeslot[$ii]];
+  my $node_name = $this_slot->{node}->{name};
+  $round_max_slots{$node_name} ||= $this_slot->{cpu};
+  last if (scalar(keys(%round_max_slots)) >= @node);
+}
 
-my %proc;
-my @freeslot = (0..$#slot);
+Log(undef, "start level $level with $round_num_freeslots slots");
 my @holdslot;
 my %reader;
 my $progress_is_dirty = 1;
@@ -650,7 +771,6 @@ my $progress_stats_updated = 0;
 update_progress_stats();
 
 
-
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
@@ -661,15 +781,15 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
 
-  pipe $reader{$id}, "writer" or croak ($!);
-  my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
-  fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+  pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+  set_nonblocking($reader{$id});
 
   my $childslot = $freeslot[0];
   my $childnode = $slot[$childslot]->{node};
   my $childslotname = join (".",
                            $slot[$childslot]->{node}->{name},
                            $slot[$childslot]->{cpu});
+
   my $childpid = fork();
   if ($childpid == 0)
   {
@@ -703,7 +823,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $ENV{"HOME"} = $ENV{"TASK_WORK"};
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-    $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
     $ENV{"GZIP"} = "-n";
@@ -717,13 +837,26 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
-       ."&& cd $ENV{CRUNCH_TMP} ";
+       ."&& cd $ENV{CRUNCH_TMP} "
+        # These environment variables get used explicitly later in
+        # $command.  No tool is expected to read these values directly.
+        .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
+        .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
+        ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
+        ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
-      my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
+      my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+      # We only set memory limits if Docker lets us limit both memory and swap.
+      # Memory limits alone have been supported longer, but subprocesses tend
+      # to get SIGKILL if they exceed that without any swap limit set.
+      # See #5642 for additional background.
+      if ($docker_limitmem) {
+        $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
+      }
 
       # Dynamically configure the container to use the host system as its
       # DNS server.  Get the host's global addresses from the ip command,
@@ -829,9 +962,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
   while (!@freeslot
         ||
-        (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+        ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
-    last THISROUND if $main::please_freeze;
+    last THISROUND if $main::please_freeze || defined($main::success);
     if ($main::please_info)
     {
       $main::please_info = 0;
@@ -941,7 +1074,7 @@ if (!$collated_output) {
   Log (undef, "Failed to write output collection");
 }
 else {
-  Log(undef, "output hash " . $collated_output);
+  Log(undef, "job output $collated_output");
   $Job->update_attributes('output' => $collated_output);
 }
 
@@ -1010,7 +1143,7 @@ sub reapchildren
   {
     my $temporary_fail;
     $temporary_fail ||= $Jobstep->{node_fail};
-    $temporary_fail ||= ($exitvalue == 111);
+    $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
@@ -1031,13 +1164,12 @@ sub reapchildren
 
     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
                              ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $temporary_fail ? 'temporary' : 'permanent',
                              $elapsed));
 
     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
       # Give up on this task, and the whole job
       $main::success = 0;
-      $main::please_freeze = 1;
     }
     # Put this task back on the todo queue
     push @jobstep_todo, $jobstepid;
@@ -1056,7 +1188,9 @@ sub reapchildren
   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
   $Jobstep->{'arvados_task'}->save;
   process_stderr ($jobstepid, $task_success);
-  Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+  Log ($jobstepid, sprintf("task output (%d bytes): %s",
+                           length($Jobstep->{'arvados_task'}->{output}),
+                           $Jobstep->{'arvados_task'}->{output}));
 
   close $reader{$jobstepid};
   delete $reader{$jobstepid};
@@ -1230,7 +1364,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+    elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
       $jobstep[$job]->{node_fail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
@@ -1252,16 +1386,19 @@ sub process_stderr
 sub fetch_block
 {
   my $hash = shift;
-  my ($keep, $child_out, $output_block);
-
-  my $cmd = "arv-get \Q$hash\E";
-  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
-  $output_block = '';
+  my $keep;
+  if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+    Log(undef, "fetch_block run error from arv-get $hash: $!");
+    return undef;
+  }
+  my $output_block = "";
   while (1) {
     my $buf;
     my $bytes = sysread($keep, $buf, 1024 * 1024);
     if (!defined $bytes) {
-      die "reading from arv-get: $!";
+      Log(undef, "fetch_block read error from arv-get: $!");
+      $output_block = undef;
+      last;
     } elsif ($bytes == 0) {
       # sysread returns 0 at the end of the pipe.
       last;
@@ -1271,60 +1408,80 @@ sub fetch_block
     }
   }
   close $keep;
+  if ($?) {
+    Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+    $output_block = undef;
+  }
   return $output_block;
 }
 
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
 sub create_output_collection
 {
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'python', '-c',
-                  'import arvados; ' .
-                  'import sys; ' .
-                  'print arvados.api()' .
-                  '.collections()' .
-                  '.create(body={"manifest_text":sys.stdin.read()})' .
-                  '.execute()["portable_data_hash"]'
-      );
-
+  my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+       create(body={"manifest_text": sys.stdin.read()}).
+       execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
+
+  my $task_idx = -1;
+  my $manifest_size = 0;
   for (@jobstep)
   {
-    next if (!exists $_->{'arvados_task'}->{'output'} ||
-             !$_->{'arvados_task'}->{'success'});
+    ++$task_idx;
     my $output = $_->{'arvados_task'}->{output};
-    if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
-    {
-      print $child_in $output;
-    }
-    elsif (defined (my $outblock = fetch_block ($output)))
-    {
-      print $child_in $outblock;
+    next if (!defined($output));
+    my $next_write;
+    if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+      $next_write = fetch_block($output);
+    } else {
+      $next_write = $output;
     }
-    else
-    {
-      Log (undef, "XXX fetch_block($output) failed XXX");
+    if (defined($next_write)) {
+      if (!defined(syswrite($child_in, $next_write))) {
+        # There's been an error writing.  Stop the loop.
+        # We'll log details about the exit code later.
+        last;
+      } else {
+        $manifest_size += length($next_write);
+      }
+    } else {
+      my $uuid = $_->{'arvados_task'}->{'uuid'};
+      Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
       $main::success = 0;
     }
   }
-  $child_in->close;
+  close($child_in);
+  Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
 
   my $joboutput;
   my $s = IO::Select->new($child_out);
   if ($s->can_read(120)) {
-    sysread($child_out, $joboutput, 64 * 1024 * 1024);
-    chomp($joboutput);
-    # TODO: Ensure exit status == 0.
+    sysread($child_out, $joboutput, 1024 * 1024);
+    waitpid($pid, 0);
+    if ($?) {
+      Log(undef, "output collection creation exited " . exit_status_s($?));
+      $joboutput = undef;
+    } else {
+      chomp($joboutput);
+    }
   } else {
     Log (undef, "timed out while creating output collection");
+    foreach my $signal (2, 2, 2, 15, 15, 9) {
+      kill($signal, $pid);
+      last if waitpid($pid, WNOHANG) == -1;
+      sleep(1);
+    }
   }
-  # TODO: kill $pid instead of waiting, now that we've decided to
-  # ignore further output.
-  waitpid($pid, 0);
+  close($child_out);
 
   return $joboutput;
 }
@@ -1398,8 +1555,11 @@ sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
-                        'arv-put', '--portable-data-hash',
+                        'arv-put',
+                        '--portable-data-hash',
+                        '--project-uuid', $Job->{owner_uuid},
                         '--retries', '3',
+                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
 }
@@ -1573,7 +1733,13 @@ sub srun
   my $show_cmd = Dumper($args);
   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
   $show_cmd =~ s/\n/ /g;
-  warn "starting: $show_cmd\n";
+  if ($opts->{fork}) {
+    Log(undef, "starting: $show_cmd");
+  } else {
+    # This is a child process: parent is in charge of reading our
+    # stderr and copying it to Log() if needed.
+    warn "starting: $show_cmd\n";
+  }
 
   if (defined $stdin) {
     my $child = open STDIN, "-|";
@@ -1782,6 +1948,12 @@ sub combined_git_archive {
   return $tar_contents;
 }
 
+sub set_nonblocking {
+  my $fh = shift;
+  my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+  fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+}
+
 __DATA__
 #!/usr/bin/perl
 #
@@ -1803,16 +1975,21 @@ use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
 use POSIX qw(getcwd);
 
+use constant TASK_TEMPFAIL => 111;
+
 # Map SDK subdirectories to the path environments they belong to.
 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
 
 my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
 my $repo = $ENV{"CRUNCH_SRC_URL"};
 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
 my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
 for my $dir ($destdir, $job_work, $task_work) {
   if ($dir) {
     make_path $dir;
@@ -1824,11 +2001,6 @@ if ($task_work) {
   remove_tree($task_work, {keep_root => 1});
 }
 
-open(STDOUT_ORIG, ">&", STDOUT);
-open(STDERR_ORIG, ">&", STDERR);
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
-
 ### Crunch script run mode
 if (@ARGV) {
   # We want to do routine logging during task 0 only.  This gives the user
@@ -1848,9 +2020,9 @@ if (@ARGV) {
   my $venv_dir = "$job_work/.arvados.venv";
   my $venv_built = -e "$venv_dir/bin/activate";
   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
-    shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+    shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
                  "--python=python2.7", $venv_dir);
-    shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+    shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
     $venv_built = 1;
     $Log->("Built Python SDK virtualenv");
   }
@@ -1889,10 +2061,6 @@ if (@ARGV) {
     }
   }
 
-  close(STDOUT);
-  close(STDERR);
-  open(STDOUT, ">&", STDOUT_ORIG);
-  open(STDERR, ">&", STDERR_ORIG);
   exec(@ARGV);
   die "Cannot exec `@ARGV`: $!";
 }
@@ -1900,12 +2068,19 @@ if (@ARGV) {
 ### Installation mode
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
-  # This version already installed -> nothing to do.
+if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
+  # This exact git archive (source + arvados sdk) is already installed
+  # here, so there's no need to reinstall it.
+
+  # We must consume our DATA section, though: otherwise the process
+  # feeding it to us will get SIGPIPE.
+  my $buf;
+  while (read(DATA, $buf, 65536)) { }
+
   exit(0);
 }
 
-unlink "$destdir.commit";
+unlink "$destdir.archive_hash";
 mkdir $destdir;
 
 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
@@ -1945,19 +2120,24 @@ if ((-d $python_dir) and can_run("python2.7") and
   close($pysdk_cfg);
 }
 
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
 if (-e "$destdir/crunch_scripts/install") {
-    shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+    shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
     # Old version
-    shell_or_die ("./tests/autotests.sh", $install_dir);
+    shell_or_die (undef, "./tests/autotests.sh", $install_dir);
 } elsif (-e "./install.sh") {
-    shell_or_die ("./install.sh", $install_dir);
+    shell_or_die (undef, "./install.sh", $install_dir);
 }
 
-if ($commit) {
-    unlink "$destdir.commit.new";
-    symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
-    rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+if ($archive_hash) {
+    unlink "$destdir.archive_hash.new";
+    symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
+    rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
 }
 
 close L;
@@ -1972,15 +2152,24 @@ sub can_run {
 
 sub shell_or_die
 {
+  my $exitcode = shift;
+
   if ($ENV{"DEBUG"}) {
     print STDERR "@_\n";
   }
   if (system (@_) != 0) {
     my $err = $!;
-    my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+    my $code = $?;
+    my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
     open STDERR, ">&STDERR_ORIG";
     system ("cat $destdir.log >&2");
-    die "@_ failed ($err): $exitstatus";
+    warn "@_ failed ($err): $exitstatus";
+    if (defined($exitcode)) {
+      exit $exitcode;
+    }
+    else {
+      exit (($code >> 8) || 1);
+    }
   }
 }