Merge branch '11917-dont-clear-cache'
[arvados.git] / sdk / cli / bin / crunch-job
index b2c14d5b713dbfaa19c17b5d324ea4c81c1df9b8..afca52cb73f1809d5f9f25cca69ba28be8becab1 100755 (executable)
@@ -1,4 +1,8 @@
 #!/usr/bin/env perl
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 
 =head1 NAME
@@ -126,14 +130,18 @@ my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $cgroup_root = "/sys/fs/cgroup";
 my $docker_bin = "docker.io";
+my $docker_run_args = "";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'cgroup-root=s' => \$cgroup_root,
            'docker-bin=s' => \$docker_bin,
+           'docker-run-args=s' => \$docker_run_args,
     );
 
 if (defined $job_api_token) {
@@ -181,11 +189,12 @@ if (($Job || $local_job)->{docker_image_locator}) {
   $cmd = [$docker_bin, 'ps', '-q'];
 }
 Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
-     $cmd,
-     {fork => 1});
-if ($? != 0) {
-  Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+  ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+  $cmd,
+  {label => "sanity check"});
+if ($exited != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($exited));
   exit EX_TEMPFAIL;
 }
 Log(undef, "Sanity check OK");
@@ -350,6 +359,7 @@ my @jobstep_done = ();
 my @jobstep_tomerge = ();
 my $jobstep_tomerge_level = 0;
 my $squeue_checked = 0;
+my $sinfo_checked = 0;
 my $latest_refresh = scalar time;
 
 
@@ -384,28 +394,17 @@ my $nodelist = join(",", @node);
 my $git_tar_count = 0;
 
 if (!defined $no_clear_tmp) {
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
-  Log (undef, "Clean work dirs");
-
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
-    # Then clean up work directories.
-    # TODO: When #5036 is done and widely deployed, we can limit mount's
-    # -t option to simply fuse.keep.
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($?) {
-    Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+  # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
+  # up work directories crunch_tmp/work, crunch_tmp/opt,
+  # crunch_tmp/src*.
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+    ['bash', '-ec', q{
+arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
+rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
+    }],
+    {label => "clean work dirs"});
+  if ($exited != 0) {
     exit(EX_RETRY_UNLOCKED);
   }
 }
@@ -413,41 +412,48 @@ if (!defined $no_clear_tmp) {
 # If this job requires a Docker image, install that.
 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
+  Log (undef, "Install docker image $docker_locator");
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
   {
     croak("No Docker image hash found from locator $docker_locator");
   }
+  Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
-    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+loaded() {
+  id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
+  echo "image ID is \$id"
+  [[ \${id} = \Q$docker_hash\E ]]
+}
+if loaded >&2 2>/dev/null; then
+  echo >&2 "image is already present"
+  exit 0
 fi
+echo >&2 "docker image is not present; loading"
+arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if ! loaded >&2; then
+  echo >&2 "`docker load` exited 0, but image is not found (!)"
+  exit 1
+fi
+echo >&2 "image loaded successfully"
 };
-  my $docker_pid = fork();
-  if ($docker_pid == 0)
-  {
-    srun (["srun", "--nodelist=" . join(',', @node)],
-          ["/bin/sh", "-ec", $docker_install_script]);
-    exit ($?);
-  }
-  while (1)
-  {
-    last if $docker_pid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($docker_pid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($? != 0)
+
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=" . join(',', @node)],
+    ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+    {label => "load docker image"});
+  if ($exited != 0)
   {
-    croak("Installing Docker image from $docker_locator exited "
-          .exit_status_s($?));
+    exit(EX_RETRY_UNLOCKED);
   }
 
   # Determine whether this version of Docker supports memory+swap limits.
-  srun(["srun", "--nodelist=" . $node[0]],
-       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
-      {fork => 1});
-  $docker_limitmem = ($? == 0);
+  ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodes=1"],
+    [$docker_bin, 'run', '--help'],
+    {label => "check --memory-swap feature"});
+  $docker_limitmem = ($stdout =~ /--memory-swap/);
 
   # Find a non-root Docker user to use.
   # Tries the default user for the container, then 'crunch', then 'nobody',
@@ -457,20 +463,22 @@ fi
   # Docker containers.
   my @tryusers = ("", "crunch", "nobody");
   foreach my $try_user (@tryusers) {
+    my $label;
     my $try_user_arg;
     if ($try_user eq "") {
-      Log(undef, "Checking if container default user is not UID 0");
+      $label = "check whether default user is UID 0";
       $try_user_arg = "";
     } else {
-      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $label = "check whether user '$try_user' is UID 0";
       $try_user_arg = "--user=$try_user";
     }
-    srun(["srun", "--nodelist=" . $node[0]],
-         ["/bin/sh", "-ec",
-          "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
-          " test \$a -ne 0"],
-         {fork => 1});
-    if ($? == 0) {
+    my ($exited, $stdout, $stderr) = srun_sync(
+      ["srun", "--nodes=1"],
+      ["/bin/sh", "-ec",
+       "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+      {label => $label});
+    chomp($stdout);
+    if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
       $dockeruserarg = $try_user_arg;
       if ($try_user eq "") {
         Log(undef, "Container will run with default user");
@@ -660,11 +668,9 @@ if (!defined $git_archive) {
   }
 }
 else {
-  my $install_exited;
+  my $exited;
   my $install_script_tries_left = 3;
   for (my $attempts = 0; $attempts < 3; $attempts++) {
-    Log(undef, "Run install script on all workers");
-
     my @srunargs = ("srun",
                     "--nodelist=$nodelist",
                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
@@ -672,59 +678,21 @@ else {
                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
 
     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-    my ($install_stderr_r, $install_stderr_w);
-    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
-    set_nonblocking($install_stderr_r);
-    my $installpid = fork();
-    if ($installpid == 0)
-    {
-      close($install_stderr_r);
-      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
-      open(STDOUT, ">&", $install_stderr_w);
-      open(STDERR, ">&", $install_stderr_w);
-      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-      exit (1);
-    }
-    close($install_stderr_w);
-    # Tell freeze_if_want_freeze how to kill the child, otherwise the
-    # "waitpid(installpid)" loop won't get interrupted by a freeze:
-    $proc{$installpid} = {};
-    my $stderr_buf = '';
-    # Track whether anything appears on stderr other than slurm errors
-    # ("srun: ...") and the "starting: ..." message printed by the
-    # srun subroutine itself:
+    my ($stdout, $stderr);
+    ($exited, $stdout, $stderr) = srun_sync(
+      \@srunargs, \@execargs,
+      {label => "run install script on all workers"},
+      $build_script . $git_archive);
+
     my $stderr_anything_from_script = 0;
-    my $match_our_own_errors = '^(srun: error: |starting: \[)';
-    while ($installpid != waitpid(-1, WNOHANG)) {
-      freeze_if_want_freeze ($installpid);
-      # Wait up to 0.1 seconds for something to appear on stderr, then
-      # do a non-blocking read.
-      my $bits = fhbits($install_stderr_r);
-      select ($bits, undef, $bits, 0.1);
-      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
-      {
-        while ($stderr_buf =~ /^(.*?)\n/) {
-          my $line = $1;
-          substr $stderr_buf, 0, 1+length($line), "";
-          Log(undef, "stderr $line");
-          if ($line !~ /$match_our_own_errors/) {
-            $stderr_anything_from_script = 1;
-          }
-        }
-      }
-    }
-    delete $proc{$installpid};
-    $install_exited = $?;
-    close($install_stderr_r);
-    if (length($stderr_buf) > 0) {
-      if ($stderr_buf !~ /$match_our_own_errors/) {
+    for my $line (split(/\n/, $stderr)) {
+      if ($line !~ /^(srun: error: |starting: \[)/) {
         $stderr_anything_from_script = 1;
       }
-      Log(undef, "stderr $stderr_buf")
     }
 
-    Log (undef, "Install script exited ".exit_status_s($install_exited));
-    last if $install_exited == 0 || $main::please_freeze;
+    last if $exited == 0 || $main::please_freeze;
+
     # If the install script fails but doesn't print an error message,
     # the next thing anyone is likely to do is just run it again in
     # case it was a transient problem like "slurm communication fails
@@ -740,7 +708,7 @@ else {
     unlink($tar_filename);
   }
 
-  if ($install_exited != 0) {
+  if ($exited != 0) {
     croak("Giving up");
   }
 }
@@ -755,6 +723,15 @@ foreach (split (/\n/, $Job->{knobs}))
 {
   Log (undef, "knob " . $_);
 }
+my $resp = api_call(
+  'nodes/list',
+  'filters' => [['hostname', 'in', \@node]],
+  'order' => 'hostname',
+  'limit' => scalar(@node),
+    );
+for my $n (@{$resp->{items}}) {
+  Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
+}
 
 
 
@@ -799,6 +776,7 @@ if ($initial_tasks_this_level < @node) {
   @freeslot = (0..$#slot);
 }
 my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
 
 my %round_max_slots = ();
 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
@@ -870,11 +848,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
     $ENV{"HOME"} = $ENV{"TASK_WORK"};
-    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
+    my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
+
     $ENV{"GZIP"} = "-n";
 
     my @srunargs = (
@@ -892,22 +871,30 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     my $command =
-       "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
-        ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
-       ."&& cd $ENV{CRUNCH_TMP} "
+       "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
+        ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
+       ."&& cd \Q$ENV{CRUNCH_TMP}\E "
         # These environment variables get used explicitly later in
         # $command.  No tool is expected to read these values directly.
         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
-        ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-pdh --crunchstat-interval=10 --allow-other $arv_file_cache $ENV{TASK_KEEPMOUNT} --exec ";
+        ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
+        .q{&& declare -a VOLUMES=() }
+        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
+        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
+        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
+
+    $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
+    $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
+    $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
+
     if ($docker_hash)
     {
       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
+      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+      $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
       # to get SIGKILL if they exceed that without any swap limit set.
@@ -922,14 +909,18 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
 
-      # Currently, we make arv-mount's mount point appear at /keep
-      # inside the container (instead of using the same path as the
-      # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
-      # crunch scripts and utilities must not rely on this. They must
-      # use $TASK_KEEPMOUNT.
+      # Currently, we make the "by_pdh" directory in arv-mount's mount
+      # point appear at /keep inside the container (instead of using
+      # the same path as the host like we do with CRUNCH_SRC and
+      # CRUNCH_INSTALL). However, crunch scripts and utilities must
+      # not rely on this. They must use $TASK_KEEPMOUNT.
       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
       $ENV{TASK_KEEPMOUNT} = "/keep";
 
+      # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
+      $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
+      $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
+
       # TASK_WORK is almost exactly like a docker data volume: it
       # starts out empty, is writable, and persists until no
       # containers use it any more. We don't use --volumes-from to
@@ -957,6 +948,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       # For now, use the same approach as TASK_WORK above.
       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
 
+      # Bind mount the crunchrunner binary and host TLS certificates file into
+      # the container.
+      $command .= '"${VOLUMES[@]}" ';
+
       while (my ($env_key, $env_val) = each %ENV)
       {
         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
@@ -982,7 +977,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
     } else {
       # Non-docker run
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
+      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
       $command .= $stdbuf;
       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
@@ -1000,11 +995,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
   shift @freeslot;
-  $proc{$childpid} = { jobstep => $id,
-                      time => time,
-                      slot => $childslot,
-                      jobstepname => "$job_id.$id.$childpid",
-                    };
+  $proc{$childpid} = {
+    jobstepidx => $id,
+    time => time,
+    slot => $childslot,
+    jobstepname => "$job_id.$id.$childpid",
+  };
   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
   $slot[$childslot]->{pid} = $childpid;
 
@@ -1046,12 +1042,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       check_refresh_wanted();
       check_squeue();
       update_progress_stats();
-      select (undef, undef, undef, 0.1);
     }
     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
     {
       update_progress_stats();
     }
+    if (!$gotsome) {
+      select (undef, undef, undef, 0.1);
+    }
     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
                                         $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1136,10 +1134,10 @@ freeze();
 my $collated_output = save_output_collection();
 Log (undef, "finish");
 
-save_meta();
+my $final_log = save_meta();
 
 my $final_state;
-if ($collated_output && $main::success) {
+if ($collated_output && $final_log && $main::success) {
   $final_state = 'Complete';
 } else {
   $final_state = 'Failed';
@@ -1170,109 +1168,119 @@ sub update_progress_stats
 
 sub reapchildren
 {
-  my $pid = waitpid (-1, WNOHANG);
-  return 0 if $pid <= 0;
-
-  my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
-                 . "."
-                 . $slot[$proc{$pid}->{slot}]->{cpu});
-  my $jobstepid = $proc{$pid}->{jobstep};
-  my $elapsed = time - $proc{$pid}->{time};
-  my $Jobstep = $jobstep[$jobstepid];
-
-  my $childstatus = $?;
-  my $exitvalue = $childstatus >> 8;
-  my $exitinfo = "exit ".exit_status_s($childstatus);
-  $Jobstep->{'arvados_task'}->reload;
-  my $task_success = $Jobstep->{'arvados_task'}->{success};
-
-  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
-
-  if (!defined $task_success) {
-    # task did not indicate one way or the other --> fail
-    Log($jobstepid, sprintf(
-          "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
-          exit_status_s($childstatus)));
-    $Jobstep->{'arvados_task'}->{success} = 0;
-    $Jobstep->{'arvados_task'}->save;
-    $task_success = 0;
-  }
+  my $children_reaped = 0;
+  my @successful_task_uuids = ();
 
-  if (!$task_success)
+  while((my $pid = waitpid (-1, WNOHANG)) > 0)
   {
-    my $temporary_fail;
-    $temporary_fail ||= $Jobstep->{tempfail};
-    $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
-    ++$thisround_failed;
-    ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
-    # Check for signs of a failed or misconfigured node
-    if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
-       2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
-      # Don't count this against jobstep failure thresholds if this
-      # node is already suspected faulty and srun exited quickly
-      if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5) {
-       Log ($jobstepid, "blaming failure on suspect node " .
-             $slot[$proc{$pid}->{slot}]->{node}->{name});
-        $temporary_fail ||= 1;
-      }
-      ban_node_by_slot($proc{$pid}->{slot});
+    my $childstatus = $?;
+
+    my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+                    . "."
+                    . $slot[$proc{$pid}->{slot}]->{cpu});
+    my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+    $children_reaped++;
+    my $elapsed = time - $proc{$pid}->{time};
+    my $Jobstep = $jobstep[$jobstepidx];
+
+    my $exitvalue = $childstatus >> 8;
+    my $exitinfo = "exit ".exit_status_s($childstatus);
+    $Jobstep->{'arvados_task'}->reload;
+    my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+    Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+    if (!defined $task_success) {
+      # task did not indicate one way or the other --> fail
+      Log($jobstepidx, sprintf(
+            "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+            exit_status_s($childstatus)));
+      $Jobstep->{'arvados_task'}->{success} = 0;
+      $Jobstep->{'arvados_task'}->save;
+      $task_success = 0;
     }
 
-    Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
-                             ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary' : 'permanent',
-                             $elapsed));
+    if (!$task_success)
+    {
+      my $temporary_fail;
+      $temporary_fail ||= $Jobstep->{tempfail};
+      $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+      ++$thisround_failed;
+      ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+      # Check for signs of a failed or misconfigured node
+      if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+          2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+        # Don't count this against jobstep failure thresholds if this
+        # node is already suspected faulty and srun exited quickly
+        if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+            $elapsed < 5) {
+          Log ($jobstepidx, "blaming failure on suspect node " .
+               $slot[$proc{$pid}->{slot}]->{node}->{name});
+          $temporary_fail ||= 1;
+        }
+        ban_node_by_slot($proc{$pid}->{slot});
+      }
 
-    if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
-      # Give up on this task, and the whole job
-      $main::success = 0;
+      Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+                                ++$Jobstep->{'failures'},
+                                $temporary_fail ? 'temporary' : 'permanent',
+                                $elapsed));
+
+      if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+        # Give up on this task, and the whole job
+        $main::success = 0;
+      }
+      # Put this task back on the todo queue
+      push @jobstep_todo, $jobstepidx;
+      $Job->{'tasks_summary'}->{'failed'}++;
     }
-    # Put this task back on the todo queue
-    push @jobstep_todo, $jobstepid;
-    $Job->{'tasks_summary'}->{'failed'}++;
+    else # task_success
+    {
+      push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
+      ++$thisround_succeeded;
+      $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+      $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+      $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+      push @jobstep_done, $jobstepidx;
+      Log ($jobstepidx, "success in $elapsed seconds");
+    }
+    $Jobstep->{exitcode} = $childstatus;
+    $Jobstep->{finishtime} = time;
+    $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+    $Jobstep->{'arvados_task'}->save;
+    process_stderr_final ($jobstepidx);
+    Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+                              length($Jobstep->{'arvados_task'}->{output}),
+                              $Jobstep->{'arvados_task'}->{output}));
+
+    close $reader{$jobstepidx};
+    delete $reader{$jobstepidx};
+    delete $slot[$proc{$pid}->{slot}]->{pid};
+    push @freeslot, $proc{$pid}->{slot};
+    delete $proc{$pid};
+
+    $progress_is_dirty = 1;
   }
-  else
+
+  if (scalar(@successful_task_uuids) > 0)
   {
-    ++$thisround_succeeded;
-    $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
-    $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
-    $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-    push @jobstep_done, $jobstepid;
-    Log ($jobstepid, "success in $elapsed seconds");
-  }
-  $Jobstep->{exitcode} = $childstatus;
-  $Jobstep->{finishtime} = time;
-  $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-  $Jobstep->{'arvados_task'}->save;
-  process_stderr ($jobstepid, $task_success);
-  Log ($jobstepid, sprintf("task output (%d bytes): %s",
-                           length($Jobstep->{'arvados_task'}->{output}),
-                           $Jobstep->{'arvados_task'}->{output}));
-
-  close $reader{$jobstepid};
-  delete $reader{$jobstepid};
-  delete $slot[$proc{$pid}->{slot}]->{pid};
-  push @freeslot, $proc{$pid}->{slot};
-  delete $proc{$pid};
-
-  if ($task_success) {
+    Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
     # Load new tasks
     my $newtask_list = [];
     my $newtask_results;
     do {
       $newtask_results = api_call(
         "job_tasks/list",
-        'where' => {
-          'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-        },
+        'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
         'order' => 'qsequence',
         'offset' => scalar(@$newtask_list),
-      );
+          );
       push(@$newtask_list, @{$newtask_results->{items}});
     } while (@{$newtask_results->{items}});
+    Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
     foreach my $arvados_task (@$newtask_list) {
       my $jobstep = {
         'level' => $arvados_task->{'sequence'},
@@ -1284,14 +1292,16 @@ sub reapchildren
     }
   }
 
-  $progress_is_dirty = 1;
-  1;
+  return $children_reaped;
 }
 
 sub check_refresh_wanted
 {
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
-  if (@stat && $stat[9] > $latest_refresh) {
+  if (@stat &&
+      $stat[9] > $latest_refresh &&
+      # ...and we have actually locked the job record...
+      $job_id eq $Job->{'uuid'}) {
     $latest_refresh = scalar time;
     my $Job2 = api_call("jobs/get", uuid => $jobspec);
     for my $attr ('cancelled_at',
@@ -1329,9 +1339,13 @@ sub check_squeue
   # squeue check interval (15s) this should make the squeue check an
   # infrequent event.
   my $silent_procs = 0;
-  for my $jobstep (values %proc)
+  for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
   {
-    if ($jobstep->{stderr_at} < $last_squeue_check)
+    if (!exists($js->{stderr_at}))
+    {
+      $js->{stderr_at} = 0;
+    }
+    if ($js->{stderr_at} < $last_squeue_check)
     {
       $silent_procs++;
     }
@@ -1339,17 +1353,18 @@ sub check_squeue
   return if $silent_procs == 0;
 
   # use killem() on procs whose killtime is reached
-  while (my ($pid, $jobstep) = each %proc)
+  while (my ($pid, $procinfo) = each %proc)
   {
-    if (exists $jobstep->{killtime}
-        && $jobstep->{killtime} <= time
-        && $jobstep->{stderr_at} < $last_squeue_check)
+    my $js = $jobstep[$procinfo->{jobstepidx}];
+    if (exists $procinfo->{killtime}
+        && $procinfo->{killtime} <= time
+        && $js->{stderr_at} < $last_squeue_check)
     {
       my $sincewhen = "";
-      if ($jobstep->{stderr_at}) {
-        $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+      if ($js->{stderr_at}) {
+        $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
       }
-      Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
       killem ($pid);
     }
   }
@@ -1384,12 +1399,12 @@ sub check_squeue
   }
 
   # Check for child procs >60s old and not mentioned by squeue.
-  while (my ($pid, $jobstep) = each %proc)
+  while (my ($pid, $procinfo) = each %proc)
   {
-    if ($jobstep->{time} < time - 60
-        && $jobstep->{jobstepname}
-        && !exists $ok{$jobstep->{jobstepname}}
-        && !exists $jobstep->{killtime})
+    if ($procinfo->{time} < time - 60
+        && $procinfo->{jobstepname}
+        && !exists $ok{$procinfo->{jobstepname}}
+        && !exists $procinfo->{killtime})
     {
       # According to slurm, this task has ended (successfully or not)
       # -- but our srun child hasn't exited. First we must wait (30
@@ -1398,12 +1413,43 @@ sub check_squeue
       # terminated, we'll conclude some slurm communication
       # error/delay has caused the task to die without notifying srun,
       # and we'll kill srun ourselves.
-      $jobstep->{killtime} = time + 30;
-      Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+      $procinfo->{killtime} = time + 30;
+      Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
 
+sub check_sinfo
+{
+  # If a node fails in a multi-node "srun" call during job setup, the call
+  # may hang instead of exiting with a nonzero code.  This function checks
+  # "sinfo" for the health of the nodes that were allocated and ensures that
+  # they are all still in the "alloc" state.  If a node that is allocated to
+  # this job is not in "alloc" state, then set please_freeze.
+  #
+  # This is only called from srun_sync() for node configuration.  If a
+  # node fails doing actual work, there are other recovery mechanisms.
+
+  # Do not call `sinfo` more than once every 15 seconds.
+  return if $sinfo_checked > time - 15;
+  $sinfo_checked = time;
+
+  # The output format "%t" means output node states.
+  my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
+  if ($? != 0)
+  {
+    Log(undef, "warning: sinfo exit status $? ($!)");
+    return;
+  }
+  chop @sinfo;
+
+  foreach (@sinfo)
+  {
+    if ($_ != "alloc" && $_ != "alloc*") {
+      $main::please_freeze = 1;
+    }
+  }
+}
 
 sub release_allocation
 {
@@ -1418,64 +1464,105 @@ sub release_allocation
 sub readfrompipes
 {
   my $gotsome = 0;
-  foreach my $job (keys %reader)
+  my %fd_job;
+  my $sel = IO::Select->new();
+  foreach my $jobstepidx (keys %reader)
+  {
+    my $fd = $reader{$jobstepidx};
+    $sel->add($fd);
+    $fd_job{$fd} = $jobstepidx;
+
+    if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+      $sel->add($stdout_fd);
+      $fd_job{$stdout_fd} = $jobstepidx;
+    }
+  }
+  # select on all reader fds with 0.1s timeout
+  my @ready_fds = $sel->can_read(0.1);
+  foreach my $fd (@ready_fds)
   {
     my $buf;
-    while (0 < sysread ($reader{$job}, $buf, 8192))
+    if (0 < sysread ($fd, $buf, 65536))
     {
+      $gotsome = 1;
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      $jobstep[$job]->{stderr_at} = time;
-      $jobstep[$job]->{stderr} .= $buf;
-      preprocess_stderr ($job);
-      if (length ($jobstep[$job]->{stderr}) > 16384)
+
+      my $jobstepidx = $fd_job{$fd};
+      if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
+        $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+        next;
+      }
+
+      $jobstep[$jobstepidx]->{stderr_at} = time;
+      $jobstep[$jobstepidx]->{stderr} .= $buf;
+
+      # Consume everything up to the last \n
+      preprocess_stderr ($jobstepidx);
+
+      if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
       {
-       substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+        # If we get a lot of stderr without a newline, chop off the
+        # front to avoid letting our buffer grow indefinitely.
+        substr ($jobstep[$jobstepidx]->{stderr},
+                0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
       }
-      $gotsome = 1;
     }
   }
   return $gotsome;
 }
 
 
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
 sub preprocess_stderr
 {
-  my $job = shift;
+  my $jobstepidx = shift;
+  # slotindex is only defined for children running Arvados job tasks.
+  # Be prepared to handle the undef case (for setup srun calls, etc.).
+  my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
 
-  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+  while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
     my $line = $1;
-    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
-    Log ($job, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
-      # whoa.
+    substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+    Log ($jobstepidx, "stderr $line");
+    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
+      # If the allocation is revoked, we can't possibly continue, so mark all
+      # nodes as failed.  This will cause the overall exit code to be
+      # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
+      # this job.
       $main::please_freeze = 1;
+      foreach my $st (@slot) {
+        $st->{node}->{fail_count}++;
+      }
     }
-    elsif ($line =~ /srun: error: Node failure on/) {
-      my $job_slot_index = $jobstep[$job]->{slotindex};
-      $slot[$job_slot_index]->{node}->{fail_count}++;
-      $jobstep[$job]->{tempfail} = 1;
-      ban_node_by_slot($job_slot_index);
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+      $jobstep[$jobstepidx]->{tempfail} = 1;
+      if (defined($job_slot_index)) {
+        $slot[$job_slot_index]->{node}->{fail_count}++;
+        ban_node_by_slot($job_slot_index);
+      }
     }
-    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
-      $jobstep[$job]->{tempfail} = 1;
-      ban_node_by_slot($jobstep[$job]->{slotindex});
+    elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
+      $jobstep[$jobstepidx]->{tempfail} = 1;
+      ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
     }
-    elsif ($line =~ /arvados\.errors\.Keep/) {
-      $jobstep[$job]->{tempfail} = 1;
+    elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
+      $jobstep[$jobstepidx]->{tempfail} = 1;
     }
   }
 }
 
 
-sub process_stderr
+sub process_stderr_final
 {
-  my $job = shift;
-  my $task_success = shift;
-  preprocess_stderr ($job);
+  my $jobstepidx = shift;
+  preprocess_stderr ($jobstepidx);
 
   map {
-    Log ($job, "stderr $_");
-  } split ("\n", $jobstep[$job]->{stderr});
+    Log ($jobstepidx, "stderr $_");
+  } split ("\n", $jobstep[$jobstepidx]->{stderr});
+  $jobstep[$jobstepidx]->{stderr} = '';
 }
 
 sub fetch_block
@@ -1523,9 +1610,10 @@ sub create_output_collection
 import arvados
 import sys
 print (arvados.api("v1").collections().
-       create(body={"manifest_text": sys.stdin.read()}).
+       create(body={"manifest_text": sys.stdin.read(),
+                    "owner_uuid": sys.argv[2]}).
        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
-}, retry_count());
+}, retry_count(), $Job->{owner_uuid});
 
   my $task_idx = -1;
   my $manifest_size = 0;
@@ -1613,7 +1701,7 @@ sub killem
     }
     if (!exists $proc{$_}->{"sent_$sig"})
     {
-      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+      Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
       kill $sig, $_;
       select (undef, undef, undef, 0.1);
       if ($sig == 2)
@@ -1676,7 +1764,7 @@ sub log_writer_start($)
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
                         '--stream',
-                        '--retries', '3',
+                        '--retries', '6',
                         '--filename', $logfilename,
                         '-');
   $log_pipe_out_buf = "";
@@ -1710,7 +1798,7 @@ sub log_writer_finish()
   close($log_pipe_in);
 
   my $logger_failed = 0;
-  my $read_result = log_writer_read_output(120);
+  my $read_result = log_writer_read_output(600);
   if ($read_result == -1) {
     $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
@@ -1737,16 +1825,21 @@ sub log_writer_is_active() {
   return $log_pipe_pid;
 }
 
-sub Log                                # ($jobstep_id, $logmessage)
+sub Log                                # ($jobstepidx, $logmessage)
 {
-  if ($_[1] =~ /\n/) {
+  my ($jobstepidx, $logmessage) = @_;
+  if ($logmessage =~ /\n/) {
     for my $line (split (/\n/, $_[1])) {
-      Log ($_[0], $line);
+      Log ($jobstepidx, $line);
     }
     return;
   }
   my $fh = select STDERR; $|=1; select $fh;
-  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+  my $task_qseq = '';
+  if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+    $task_qseq = $jobstepidx;
+  }
+  my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
@@ -1768,6 +1861,7 @@ sub croak
   my ($package, $file, $line) = caller;
   my $message = "@_ at $file line $line\n";
   Log (undef, $message);
+  release_allocation();
   freeze() if @jobstep_todo;
   create_output_collection() if @jobstep_todo;
   cleanup();
@@ -1808,6 +1902,8 @@ sub save_meta
     });
   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
+
+  return $log_coll->{portable_data_hash};
 }
 
 
@@ -1869,6 +1965,88 @@ sub freezeunquote
   return $s;
 }
 
+sub srun_sync
+{
+  my $srunargs = shift;
+  my $execargs = shift;
+  my $opts = shift || {};
+  my $stdin = shift;
+
+  my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+  Log (undef, "$label: start");
+
+  my ($stderr_r, $stderr_w);
+  pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+  my ($stdout_r, $stdout_w);
+  pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+  my $srunpid = fork();
+  if ($srunpid == 0)
+  {
+    close($stderr_r);
+    close($stdout_r);
+    fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+    fcntl($stdout_w, F_SETFL, 0) or croak($!);
+    open(STDERR, ">&", $stderr_w);
+    open(STDOUT, ">&", $stdout_w);
+    srun ($srunargs, $execargs, $opts, $stdin);
+    exit (1);
+  }
+  close($stderr_w);
+  close($stdout_w);
+
+  set_nonblocking($stderr_r);
+  set_nonblocking($stdout_r);
+
+  # Add entries to @jobstep and %proc so check_squeue() and
+  # freeze_if_want_freeze() can treat it like a job task process.
+  push @jobstep, {
+    stderr => '',
+    stderr_at => 0,
+    stderr_captured => '',
+    stdout_r => $stdout_r,
+    stdout_captured => '',
+  };
+  my $jobstepidx = $#jobstep;
+  $proc{$srunpid} = {
+    jobstepidx => $jobstepidx,
+  };
+  $reader{$jobstepidx} = $stderr_r;
+
+  while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+    my $busy = readfrompipes();
+    if (!$busy || ($latest_refresh + 2 < scalar time)) {
+      check_refresh_wanted();
+      check_squeue();
+      check_sinfo();
+    }
+    if (!$busy) {
+      select(undef, undef, undef, 0.1);
+    }
+    killem(keys %proc) if $main::please_freeze;
+  }
+  my $exited = $?;
+
+  1 while readfrompipes();
+  process_stderr_final ($jobstepidx);
+
+  Log (undef, "$label: exit ".exit_status_s($exited));
+
+  close($stdout_r);
+  close($stderr_r);
+  delete $proc{$srunpid};
+  delete $reader{$jobstepidx};
+
+  my $j = pop @jobstep;
+  # If the srun showed signs of tempfail, ensure the caller treats that as a
+  # failure case.
+  if ($main::please_freeze || $j->{tempfail}) {
+    $exited ||= 255;
+  }
+  return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
 
 sub srun
 {
@@ -1947,7 +2125,7 @@ sub find_docker_image {
       }
     }
   }
-  if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
+  if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
     return ($streamname, $1);
   } else {
     return (undef, undef);