Merge branch 'master' into 7661-fuse-by-pdh
[arvados.git] / sdk / cli / bin / crunch-job
index 6cdaf904c437fc57dede49e91505c8216075f9cb..e2a4e264c3697e916ca8c88f98cf3a88d80e0e9a 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 
 =head1 NAME
@@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree );
 
 use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -125,20 +126,21 @@ my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $docker_bin = "docker.io";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'docker-bin=s' => \$docker_bin,
     );
 
 if (defined $job_api_token) {
   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
 }
 
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
+my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
 
 
 $SIG{'USR1'} = sub
@@ -150,8 +152,6 @@ $SIG{'USR2'} = sub
   $main::ENV{CRUNCH_DEBUG} = 0;
 };
 
-
-
 my $arv = Arvados->new('apiVersion' => 'v1');
 
 my $Job;
@@ -160,12 +160,40 @@ my $dbh;
 my $sth;
 my @jobstep;
 
-my $User = api_call("users/current");
-
+my $local_job;
 if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
   $Job = api_call("jobs/get", uuid => $jobspec);
+  $local_job = 0;
+}
+else
+{
+  $local_job = JSON::decode_json($jobspec);
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if (($Job || $local_job)->{docker_image_locator}) {
+  $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+     $cmd,
+     {fork => 1});
+if ($? != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($?));
+  exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
   if (!$force_unlock) {
     # Claim this job, and make sure nobody else does
     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
@@ -177,19 +205,17 @@ if ($jobspec =~ /^[-a-z\d]+$/)
 }
 else
 {
-  $Job = JSON::decode_json($jobspec);
-
   if (!$resume_stash)
   {
-    map { croak ("No $_ specified") unless $Job->{$_} }
+    map { croak ("No $_ specified") unless $local_job->{$_} }
     qw(script script_version script_parameters);
   }
 
-  $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $Job->{'started_at'} = gmtime;
-  $Job->{'state'} = 'Running';
+  $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
+  $local_job->{'started_at'} = gmtime;
+  $local_job->{'state'} = 'Running';
 
-  $Job = api_call("jobs/create", job => $Job);
+  $Job = api_call("jobs/create", job => $local_job);
 }
 $job_id = $Job->{'uuid'};
 
@@ -266,9 +292,16 @@ foreach (@sinfo)
   {
     Log (undef, "node $nodename - $ncpus slots");
     my $node = { name => $nodename,
-                ncpus => $ncpus,
-                losing_streak => 0,
-                hold_until => 0 };
+                 ncpus => $ncpus,
+                 # The number of consecutive times a task has been dispatched
+                 # to this node and failed.
+                 losing_streak => 0,
+                 # The number of consecutive times that SLURM has reported
+                 # a node failure since the last successful task.
+                 fail_count => 0,
+                 # Don't dispatch work to this node until this time
+                 # (in seconds since the epoch) has passed.
+                 hold_until => 0 };
     foreach my $cpu (1..$ncpus)
     {
       push @slot, { node => $node,
@@ -316,8 +349,7 @@ my @jobstep_todo = ();
 my @jobstep_done = ();
 my @jobstep_tomerge = ();
 my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
+my $squeue_checked = 0;
 my $latest_refresh = scalar time;
 
 
@@ -358,12 +390,12 @@ if (!defined $no_clear_tmp) {
   my $cleanpid = fork();
   if ($cleanpid == 0)
   {
-    # Find FUSE mounts that look like Keep mounts (the mount path has the
-    # word "keep") and unmount them.  Then clean up work directories.
-    # TODO: When #5036 is done and widely deployed, we can get rid of the
-    # regular expression and just unmount everything with type fuse.keep.
+    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
+    # Then clean up work directories.
+    # TODO: When #5036 is done and widely deployed, we can limit mount's
+    # -t option to simply fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
+          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
     exit (1);
   }
   while (1)
@@ -372,12 +404,14 @@ if (!defined $no_clear_tmp) {
     freeze_if_want_freeze ($cleanpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Cleanup command exited ".exit_status_s($?));
+  if ($?) {
+    Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+    exit(EX_RETRY_UNLOCKED);
+  }
 }
 
 # If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
@@ -415,6 +449,42 @@ fi
       {fork => 1});
   $docker_limitmem = ($? == 0);
 
+  # Find a non-root Docker user to use.
+  # Tries the default user for the container, then 'crunch', then 'nobody',
+  # testing for whether the actual user id is non-zero.  This defends against
+  # mistakes but not malice, but we intend to harden the security in the future
+  # so we don't want anyone getting used to their jobs running as root in their
+  # Docker containers.
+  my @tryusers = ("", "crunch", "nobody");
+  foreach my $try_user (@tryusers) {
+    my $try_user_arg;
+    if ($try_user eq "") {
+      Log(undef, "Checking if container default user is not UID 0");
+      $try_user_arg = "";
+    } else {
+      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $try_user_arg = "--user=$try_user";
+    }
+    srun(["srun", "--nodelist=" . $node[0]],
+         ["/bin/sh", "-ec",
+          "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
+          " test \$a -ne 0"],
+         {fork => 1});
+    if ($? == 0) {
+      $dockeruserarg = $try_user_arg;
+      if ($try_user eq "") {
+        Log(undef, "Container will run with default user");
+      } else {
+        Log(undef, "Container will run with $dockeruserarg");
+      }
+      last;
+    }
+  }
+
+  if (!defined $dockeruserarg) {
+    croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+  }
+
   if ($Job->{arvados_sdk_version}) {
     # The job also specifies an Arvados SDK version.  Add the SDKs to the
     # tar file for the build script to install.
@@ -564,7 +634,7 @@ else {
   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
     croak("`$gitcmd rev-list` exited "
           .exit_status_s($?)
-          .", '$treeish' not found. Giving up.");
+          .", '$treeish' not found, giving up");
   }
   $commit = $1;
   Log(undef, "Version $treeish is commit $commit");
@@ -697,6 +767,7 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
@@ -809,6 +880,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
       "--job-name=$job_id.$id.$$",
        );
+
+    my $stdbuf = " stdbuf --output=0 --error=0 ";
+
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
@@ -819,12 +893,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+    $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
-      my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
+      my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
+      my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+      $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
       # to get SIGKILL if they exceed that without any swap limit set.
@@ -833,13 +908,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
       }
 
-      # Dynamically configure the container to use the host system as its
-      # DNS server.  Get the host's global addresses from the ip command,
-      # and turn them into docker --dns options using gawk.
-      $command .=
-          q{$(ip -o address show scope global |
-              gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
-
       # The source tree and $destdir directory (which we have
       # installed on the worker host) are available in the container,
       # under the same path.
@@ -889,12 +957,22 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
       $command .= "--env=\QHOME=$ENV{HOME}\E ";
       $command .= "\Q$docker_hash\E ";
-      $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+      if ($Job->{arvados_sdk_version}) {
+        $command .= $stdbuf;
+        $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+      } else {
+        $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+            "if which stdbuf >/dev/null ; then " .
+            "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " else " .
+            "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " fi\'";
+      }
     } else {
       # Non-docker run
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
-      $command .= "stdbuf --output=0 --error=0 ";
+      $command .= $stdbuf;
       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
 
@@ -926,6 +1004,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   $Jobstep->{slotindex} = $childslot;
   delete $Jobstep->{stderr};
   delete $Jobstep->{finishtime};
+  delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
   $Jobstep->{'arvados_task'}->save;
@@ -951,17 +1030,19 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my $gotsome
        = readfrompipes ()
        + reapchildren ();
-    if (!$gotsome)
+    if (!$gotsome || ($latest_refresh + 2 < scalar time))
     {
       check_refresh_wanted();
       check_squeue();
       update_progress_stats();
       select (undef, undef, undef, 0.1);
     }
-    elsif (time - $progress_stats_updated >= 30)
+    elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
        ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -985,10 +1066,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -1024,18 +1103,18 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_output_collection();
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -1043,16 +1122,7 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &create_output_collection();
-
-if (!$collated_output) {
-  Log (undef, "Failed to write output collection");
-}
-else {
-  Log(undef, "job output $collated_output");
-  $Job->update_attributes('output' => $collated_output);
-}
-
+my $collated_output = save_output_collection();
 Log (undef, "finish");
 
 save_meta();
@@ -1074,8 +1144,8 @@ sub update_progress_stats
   $progress_stats_updated = time;
   return if !$progress_is_dirty;
   my ($todo, $done, $running) = (scalar @jobstep_todo,
-                                scalar @jobstep_done,
-                                scalar @slot - scalar @freeslot - scalar @holdslot);
+                                 scalar @jobstep_done,
+                                 scalar keys(%proc));
   $Job->{'tasks_summary'} ||= {};
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
@@ -1117,7 +1187,7 @@ sub reapchildren
   if (!$task_success)
   {
     my $temporary_fail;
-    $temporary_fail ||= $Jobstep->{node_fail};
+    $temporary_fail ||= $Jobstep->{tempfail};
     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
@@ -1139,7 +1209,7 @@ sub reapchildren
 
     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
                              ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $temporary_fail ? 'temporary' : 'permanent',
                              $elapsed));
 
     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
@@ -1155,6 +1225,7 @@ sub reapchildren
     ++$thisround_succeeded;
     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+    $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
@@ -1229,29 +1300,45 @@ sub check_refresh_wanted
 
 sub check_squeue
 {
-  # return if the kill list was checked <4 seconds ago
-  if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
-  {
-    return;
-  }
-  $squeue_kill_checked = time;
+  my $last_squeue_check = $squeue_checked;
 
-  # use killem() on procs whose killtime is reached
-  for (keys %proc)
+  # Do not call `squeue` or check the kill list more than once every
+  # 15 seconds.
+  return if $last_squeue_check > time - 15;
+  $squeue_checked = time;
+
+  # Look for children from which we haven't received stderr data since
+  # the last squeue check. If no such children exist, all procs are
+  # alive and there's no need to even look at squeue.
+  #
+  # As long as the crunchstat poll interval (10s) is shorter than the
+  # squeue check interval (15s) this should make the squeue check an
+  # infrequent event.
+  my $silent_procs = 0;
+  for my $jobstep (values %proc)
   {
-    if (exists $proc{$_}->{killtime}
-       && $proc{$_}->{killtime} <= time)
+    if ($jobstep->{stderr_at} < $last_squeue_check)
     {
-      killem ($_);
+      $silent_procs++;
     }
   }
+  return if $silent_procs == 0;
 
-  # return if the squeue was checked <60 seconds ago
-  if (defined $squeue_checked && $squeue_checked > time - 60)
+  # use killem() on procs whose killtime is reached
+  while (my ($pid, $jobstep) = each %proc)
   {
-    return;
+    if (exists $jobstep->{killtime}
+        && $jobstep->{killtime} <= time
+        && $jobstep->{stderr_at} < $last_squeue_check)
+    {
+      my $sincewhen = "";
+      if ($jobstep->{stderr_at}) {
+        $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+      }
+      Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      killem ($pid);
+    }
   }
-  $squeue_checked = time;
 
   if (!$have_slurm)
   {
@@ -1259,37 +1346,46 @@ sub check_squeue
     return;
   }
 
-  # get a list of steps still running
-  my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
-  chop @squeue;
-  if ($squeue[-1] ne "ok")
+  # Get a list of steps still running.  Note: squeue(1) says --steps
+  # selects a format (which we override anyway) and allows us to
+  # specify which steps we're interested in (which we don't).
+  # Importantly, it also changes the meaning of %j from "job name" to
+  # "step name" and (although this isn't mentioned explicitly in the
+  # docs) switches from "one line per job" mode to "one line per step"
+  # mode. Without it, we'd just get a list of one job, instead of a
+  # list of N steps.
+  my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
+  if ($? != 0)
   {
+    Log(undef, "warning: squeue exit status $? ($!)");
     return;
   }
-  pop @squeue;
+  chop @squeue;
 
   # which of my jobsteps are running, according to squeue?
   my %ok;
-  foreach (@squeue)
+  for my $jobstepname (@squeue)
   {
-    if (/^(\d+)\.(\d+) (\S+)/)
-    {
-      if ($1 eq $ENV{SLURM_JOBID})
-      {
-       $ok{$3} = 1;
-      }
-    }
+    $ok{$jobstepname} = 1;
   }
 
-  # which of my active child procs (>60s old) were not mentioned by squeue?
-  foreach (keys %proc)
+  # Check for child procs >60s old and not mentioned by squeue.
+  while (my ($pid, $jobstep) = each %proc)
   {
-    if ($proc{$_}->{time} < time - 60
-       && !exists $ok{$proc{$_}->{jobstepname}}
-       && !exists $proc{$_}->{killtime})
+    if ($jobstep->{time} < time - 60
+        && $jobstep->{jobstepname}
+        && !exists $ok{$jobstep->{jobstepname}}
+        && !exists $jobstep->{killtime})
     {
-      # kill this proc if it hasn't exited in 30 seconds
-      $proc{$_}->{killtime} = time + 30;
+      # According to slurm, this task has ended (successfully or not)
+      # -- but our srun child hasn't exited. First we must wait (30
+      # seconds) in case this is just a race between communication
+      # channels. Then, if our srun child process still hasn't
+      # terminated, we'll conclude some slurm communication
+      # error/delay has caused the task to die without notifying srun,
+      # and we'll kill srun ourselves.
+      $jobstep->{killtime} = time + 30;
+      Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
@@ -1300,7 +1396,7 @@ sub release_allocation
   if ($have_slurm)
   {
     Log (undef, "release job allocation");
-    system "scancel $ENV{SLURM_JOBID}";
+    system "scancel $ENV{SLURM_JOB_ID}";
   }
 }
 
@@ -1314,6 +1410,7 @@ sub readfrompipes
     while (0 < sysread ($reader{$job}, $buf, 8192))
     {
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
+      $jobstep[$job]->{stderr_at} = time;
       $jobstep[$job]->{stderr} .= $buf;
       preprocess_stderr ($job);
       if (length ($jobstep[$job]->{stderr}) > 16384)
@@ -1339,10 +1436,19 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
-      $jobstep[$job]->{node_fail} = 1;
+    elsif ($line =~ /srun: error: Node failure on/) {
+      my $job_slot_index = $jobstep[$job]->{slotindex};
+      $slot[$job_slot_index]->{node}->{fail_count}++;
+      $jobstep[$job]->{tempfail} = 1;
+      ban_node_by_slot($job_slot_index);
+    }
+    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+      $jobstep[$job]->{tempfail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
+    elsif ($line =~ /arvados\.errors\.Keep/) {
+      $jobstep[$job]->{tempfail} = 1;
+    }
   }
 }
 
@@ -1461,6 +1567,20 @@ print (arvados.api("v1").collections().
   return $joboutput;
 }
 
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+  my $collated_output = create_output_collection();
+
+  if (!$collated_output) {
+    Log(undef, "Failed to write output collection");
+  }
+  else {
+    Log(undef, "job output $collated_output");
+    $Job->update_attributes('output' => $collated_output);
+  }
+  return $collated_output;
+}
 
 sub killem
 {
@@ -1506,6 +1626,8 @@ sub fhbits
 # Send log output to Keep via arv-put.
 #
 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
 # $log_pipe_pid is the pid of the arv-put subprocess.
 #
 # The only functions that should access these variables directly are:
@@ -1514,6 +1636,13 @@ sub fhbits
 #     Starts an arv-put pipe, reading data on stdin and writing it to
 #     a $logfilename file in an output collection.
 #
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
 # log_writer_send($txt)
 #     Writes $txt to the output log collection.
 #
@@ -1524,25 +1653,40 @@ sub fhbits
 #     Returns a true value if there is currently a live arv-put
 #     process, false otherwise.
 #
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
 
 sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
-                        '--portable-data-hash',
-                        '--project-uuid', $Job->{owner_uuid},
+                        '--stream',
                         '--retries', '3',
-                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
 }
 
 sub log_writer_send($)
 {
   my $txt = shift;
   print $log_pipe_in $txt;
+  log_writer_read_output();
 }
 
 sub log_writer_finish()
@@ -1550,22 +1694,24 @@ sub log_writer_finish()
   return unless $log_pipe_pid;
 
   close($log_pipe_in);
-  my $arv_put_output;
 
-  my $s = IO::Select->new($log_pipe_out);
-  if ($s->can_read(120)) {
-    sysread($log_pipe_out, $arv_put_output, 1024);
-    chomp($arv_put_output);
-  } else {
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
     Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
   if ($?) {
-    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
   return $arv_put_output;
 }
 
@@ -1629,10 +1775,21 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
 
-  my $loglocator = log_writer_finish();
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
+
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
 }
 
 
@@ -1708,7 +1865,13 @@ sub srun
   my $show_cmd = Dumper($args);
   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
   $show_cmd =~ s/\n/ /g;
-  warn "starting: $show_cmd\n";
+  if ($opts->{fork}) {
+    Log(undef, "starting: $show_cmd");
+  } else {
+    # This is a child process: parent is in charge of reading our
+    # stderr and copying it to Log() if needed.
+    warn "starting: $show_cmd\n";
+  }
 
   if (defined $stdin) {
     my $child = open STDIN, "-|";
@@ -1924,7 +2087,7 @@ sub set_nonblocking {
 }
 
 __DATA__
-#!/usr/bin/perl
+#!/usr/bin/env perl
 #
 # This is crunch-job's internal dispatch script.  crunch-job running on the API
 # server invokes this script on individual compute nodes, or localhost if we're
@@ -2052,18 +2215,28 @@ if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
 unlink "$destdir.archive_hash";
 mkdir $destdir;
 
-if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
-  die "Error launching 'tar -xC $destdir': $!";
-}
-# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
-# get SIGPIPE.  We must feed it data incrementally.
-my $tar_input;
-while (read(DATA, $tar_input, 65536)) {
-  print TARX $tar_input;
-}
-if(!close(TARX)) {
-  die "'tar -xC $destdir' exited $?: $!";
-}
+do {
+  # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+  local $SIG{PIPE} = "IGNORE";
+  warn "Extracting archive: $archive_hash\n";
+  # --ignore-zeros is necessary sometimes: depending on how much NUL
+  # padding tar -A put on our combined archive (which in turn depends
+  # on the length of the component archives) tar without
+  # --ignore-zeros will exit before consuming stdin and cause close()
+  # to fail on the resulting SIGPIPE.
+  if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
+    die "Error launching 'tar -xC $destdir': $!";
+  }
+  # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+  # get SIGPIPE.  We must feed it data incrementally.
+  my $tar_input;
+  while (read(DATA, $tar_input, 65536)) {
+    print TARX $tar_input;
+  }
+  if(!close(TARX)) {
+    die "'tar -xC $destdir' exited $?: $!";
+  }
+};
 
 mkdir $install_dir;
 
@@ -2080,13 +2253,28 @@ if (-d $sdk_root) {
 }
 
 my $python_dir = "$install_dir/python";
-if ((-d $python_dir) and can_run("python2.7") and
-    (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
-  # egg_info failed, probably when it asked git for a build tag.
-  # Specify no build tag.
-  open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
-  print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
-  close($pysdk_cfg);
+if ((-d $python_dir) and can_run("python2.7")) {
+  open(my $egg_info_pipe, "-|",
+       "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
+  my @egg_info_errors = <$egg_info_pipe>;
+  close($egg_info_pipe);
+
+  if ($?) {
+    if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
+      # egg_info apparently failed because it couldn't ask git for a build tag.
+      # Specify no build tag.
+      open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+      print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+      close($pysdk_cfg);
+    } else {
+      my $egg_info_exit = $? >> 8;
+      foreach my $errline (@egg_info_errors) {
+        warn $errline;
+      }
+      warn "python setup.py egg_info failed: exit $egg_info_exit";
+      exit ($egg_info_exit || 1);
+    }
+  }
 }
 
 # Hide messages from the install script (unless it fails: shell_or_die