8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
authorTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
Conflicts:
sdk/cli/bin/crunch-job

1  2 
sdk/cli/bin/crunch-job

diff --combined sdk/cli/bin/crunch-job
index b63886e10504a725b8a7374f5861e2a033b2b57c,c8a1de9c653837ac4d7a059a5dce97483ccc6602..ca9db1dacdb162d73ee6eb53bb79d5ec2d550014
@@@ -183,12 -183,11 +183,12 @@@ if (($Job || $local_job)->{docker_image
    $cmd = [$docker_bin, 'ps', '-q'];
  }
  Log(undef, "Sanity check is `@$cmd`");
 -srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
 -     $cmd,
 -     {fork => 1});
 -if ($? != 0) {
 -  Log(undef, "Sanity check failed: ".exit_status_s($?));
 +my ($exited, $stdout, $stderr) = srun_sync(
 +  ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
 +  $cmd,
 +  {label => "sanity check"});
 +if ($exited != 0) {
 +  Log(undef, "Sanity check failed: ".exit_status_s($exited));
    exit EX_TEMPFAIL;
  }
  Log(undef, "Sanity check OK");
@@@ -387,17 -386,28 +387,17 @@@ my $nodelist = join(",", @node)
  my $git_tar_count = 0;
  
  if (!defined $no_clear_tmp) {
 -  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 -  Log (undef, "Clean work dirs");
 -
 -  my $cleanpid = fork();
 -  if ($cleanpid == 0)
 -  {
 -    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
 -    # Then clean up work directories.
 -    # TODO: When #5036 is done and widely deployed, we can limit mount's
 -    # -t option to simply fuse.keep.
 -    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
 -          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
 -    exit (1);
 -  }
 -  while (1)
 -  {
 -    last if $cleanpid == waitpid (-1, WNOHANG);
 -    freeze_if_want_freeze ($cleanpid);
 -    select (undef, undef, undef, 0.1);
 -  }
 -  if ($?) {
 -    Log(undef, "Clean work dirs: exit ".exit_status_s($?));
 +  # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
 +  # up work directories crunch_tmp/work, crunch_tmp/opt,
 +  # crunch_tmp/src*.
 +  #
 +  # TODO: When #5036 is done and widely deployed, we can limit mount's
 +  # -t option to simply fuse.keep.
 +  my ($exited, $stdout, $stderr) = srun_sync(
 +    ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
 +    ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
 +    {label => "clean work dirs"});
 +  if ($exited != 0) {
      exit(EX_RETRY_UNLOCKED);
    }
  }
  # If this job requires a Docker image, install that.
  my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
  if ($docker_locator = $Job->{docker_image_locator}) {
 +  Log (undef, "Install docker image $docker_locator");
    ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
    if (!$docker_hash)
    {
      croak("No Docker image hash found from locator $docker_locator");
    }
 +  Log (undef, "docker image hash is $docker_hash");
    $docker_stream =~ s/^\.//;
    my $docker_install_script = qq{
  if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
      arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
  fi
  };
 -  my $docker_pid = fork();
 -  if ($docker_pid == 0)
 -  {
 -    srun (["srun", "--nodelist=" . join(',', @node)],
 -          ["/bin/sh", "-ec", $docker_install_script]);
 -    exit ($?);
 -  }
 -  while (1)
 -  {
 -    last if $docker_pid == waitpid (-1, WNOHANG);
 -    freeze_if_want_freeze ($docker_pid);
 -    select (undef, undef, undef, 0.1);
 -  }
 -  if ($? != 0)
 +
 +  my ($exited, $stdout, $stderr) = srun_sync(
 +    ["srun", "--nodelist=" . join(',', @node)],
 +    ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
 +    {label => "load docker image"});
 +  if ($exited != 0)
    {
 -    croak("Installing Docker image from $docker_locator exited "
 -          .exit_status_s($?));
 +    exit(EX_RETRY_UNLOCKED);
    }
  
    # Determine whether this version of Docker supports memory+swap limits.
 -  srun(["srun", "--nodelist=" . $node[0]],
 -       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
 -      {fork => 1});
 -  $docker_limitmem = ($? == 0);
 +  ($exited, $stdout, $stderr) = srun_sync(
 +    ["srun", "--nodelist=" . $node[0]],
 +    [$docker_bin, 'run', '--help'],
 +    {label => "check --memory-swap feature"});
 +  $docker_limitmem = ($stdout =~ /--memory-swap/);
  
    # Find a non-root Docker user to use.
    # Tries the default user for the container, then 'crunch', then 'nobody',
    # Docker containers.
    my @tryusers = ("", "crunch", "nobody");
    foreach my $try_user (@tryusers) {
 +    my $label;
      my $try_user_arg;
      if ($try_user eq "") {
 -      Log(undef, "Checking if container default user is not UID 0");
 +      $label = "check whether default user is UID 0";
        $try_user_arg = "";
      } else {
 -      Log(undef, "Checking if user '$try_user' is not UID 0");
 +      $label = "check whether user '$try_user' is UID 0";
        $try_user_arg = "--user=$try_user";
      }
 -    srun(["srun", "--nodelist=" . $node[0]],
 -         ["/bin/sh", "-ec",
 -          "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
 -          " test \$a -ne 0"],
 -         {fork => 1});
 -    if ($? == 0) {
 +    my ($exited, $stdout, $stderr) = srun_sync(
 +      ["srun", "--nodelist=" . $node[0]],
 +      ["/bin/sh", "-ec",
 +       "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
 +      {label => $label});
 +    chomp($stdout);
 +    if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
        $dockeruserarg = $try_user_arg;
        if ($try_user eq "") {
          Log(undef, "Container will run with default user");
@@@ -648,9 -662,11 +648,9 @@@ if (!defined $git_archive) 
    }
  }
  else {
 -  my $install_exited;
 +  my $exited;
    my $install_script_tries_left = 3;
    for (my $attempts = 0; $attempts < 3; $attempts++) {
 -    Log(undef, "Run install script on all workers");
 -
      my @srunargs = ("srun",
                      "--nodelist=$nodelist",
                      "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
                      "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
  
      $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
 -    my ($install_stderr_r, $install_stderr_w);
 -    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
 -    set_nonblocking($install_stderr_r);
 -    my $installpid = fork();
 -    if ($installpid == 0)
 -    {
 -      close($install_stderr_r);
 -      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
 -      open(STDOUT, ">&", $install_stderr_w);
 -      open(STDERR, ">&", $install_stderr_w);
 -      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
 -      exit (1);
 -    }
 -    close($install_stderr_w);
 -    # Tell freeze_if_want_freeze how to kill the child, otherwise the
 -    # "waitpid(installpid)" loop won't get interrupted by a freeze:
 -    $proc{$installpid} = {};
 -    my $stderr_buf = '';
 -    # Track whether anything appears on stderr other than slurm errors
 -    # ("srun: ...") and the "starting: ..." message printed by the
 -    # srun subroutine itself:
 +    my ($stdout, $stderr);
 +    ($exited, $stdout, $stderr) = srun_sync(
 +      \@srunargs, \@execargs,
 +      {label => "run install script on all workers"},
 +      $build_script . $git_archive);
 +
      my $stderr_anything_from_script = 0;
 -    my $match_our_own_errors = '^(srun: error: |starting: \[)';
 -    while ($installpid != waitpid(-1, WNOHANG)) {
 -      freeze_if_want_freeze ($installpid);
 -      # Wait up to 0.1 seconds for something to appear on stderr, then
 -      # do a non-blocking read.
 -      my $bits = fhbits($install_stderr_r);
 -      select ($bits, undef, $bits, 0.1);
 -      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
 -      {
 -        while ($stderr_buf =~ /^(.*?)\n/) {
 -          my $line = $1;
 -          substr $stderr_buf, 0, 1+length($line), "";
 -          Log(undef, "stderr $line");
 -          if ($line !~ /$match_our_own_errors/) {
 -            $stderr_anything_from_script = 1;
 -          }
 -        }
 -      }
 -    }
 -    delete $proc{$installpid};
 -    $install_exited = $?;
 -    close($install_stderr_r);
 -    if (length($stderr_buf) > 0) {
 -      if ($stderr_buf !~ /$match_our_own_errors/) {
 +    for my $line (split(/\n/, $stderr)) {
 +      if ($line !~ /^(srun: error: |starting: \[)/) {
          $stderr_anything_from_script = 1;
        }
 -      Log(undef, "stderr $stderr_buf")
      }
  
 -    Log (undef, "Install script exited ".exit_status_s($install_exited));
 -    last if $install_exited == 0 || $main::please_freeze;
 +    last if $exited == 0 || $main::please_freeze;
 +
      # If the install script fails but doesn't print an error message,
      # the next thing anyone is likely to do is just run it again in
      # case it was a transient problem like "slurm communication fails
      unlink($tar_filename);
    }
  
 -  if ($install_exited != 0) {
 +  if ($exited != 0) {
      croak("Giving up");
    }
  }
@@@ -747,6 -801,7 +747,7 @@@ if ($initial_tasks_this_level < @node) 
    @freeslot = (0..$#slot);
  }
  my $round_num_freeslots = scalar(@freeslot);
+ print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
  
  my %round_max_slots = ();
  for (my $ii = $#freeslot; $ii >= 0; $ii--) {
@@@ -957,12 -1012,11 +958,12 @@@ for (my $todo_ptr = 0; $todo_ptr <= $#j
      next;
    }
    shift @freeslot;
 -  $proc{$childpid} = { jobstep => $id,
 -                     time => time,
 -                     slot => $childslot,
 -                     jobstepname => "$job_id.$id.$childpid",
 -                   };
 +  $proc{$childpid} = {
 +    jobstepidx => $id,
 +    time => time,
 +    slot => $childslot,
 +    jobstepname => "$job_id.$id.$childpid",
 +  };
    croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
    $slot[$childslot]->{pid} = $childpid;
  
      {
        update_progress_stats();
      }
 +    if (!$gotsome) {
 +      select (undef, undef, undef, 0.1);
 +    }
      $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
                                          $_->{node}->{hold_count} < 4 } @slot);
      if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@@ -1130,131 -1181,139 +1131,142 @@@ sub update_progress_stat
  
  sub reapchildren
  {
-   my $pid = waitpid (-1, WNOHANG);
-   return 0 if $pid <= 0;
-   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
-                 . "."
-                 . $slot[$proc{$pid}->{slot}]->{cpu});
-   my $jobstepidx = $proc{$pid}->{jobstepidx};
-   my $elapsed = time - $proc{$pid}->{time};
-   my $Jobstep = $jobstep[$jobstepidx];
-   my $childstatus = $?;
-   my $exitvalue = $childstatus >> 8;
-   my $exitinfo = "exit ".exit_status_s($childstatus);
-   $Jobstep->{'arvados_task'}->reload;
-   my $task_success = $Jobstep->{'arvados_task'}->{success};
-   Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
-   if (!defined $task_success) {
-     # task did not indicate one way or the other --> fail
-     Log($jobstepidx, sprintf(
-           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
-           exit_status_s($childstatus)));
-     $Jobstep->{'arvados_task'}->{success} = 0;
-     $Jobstep->{'arvados_task'}->save;
-     $task_success = 0;
-   }
-   if (!$task_success)
+   my $children_reaped = 0;
 -
 -  while((my $pid = waitpid (-1, WNOHANG)) > 0)
++  while ((my $pid = waitpid (-1, WNOHANG)) > 0)
    {
-     my $temporary_fail;
-     $temporary_fail ||= $Jobstep->{tempfail};
-     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-     ++$thisround_failed;
-     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-     # Check for signs of a failed or misconfigured node
-     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
-       2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
-       # Don't count this against jobstep failure thresholds if this
-       # node is already suspected faulty and srun exited quickly
-       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5) {
-       Log ($jobstepidx, "blaming failure on suspect node " .
-              $slot[$proc{$pid}->{slot}]->{node}->{name});
-         $temporary_fail ||= 1;
-       }
-       ban_node_by_slot($proc{$pid}->{slot});
+     my $childstatus = $?;
++
+     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+                     . "."
+                     . $slot[$proc{$pid}->{slot}]->{cpu});
 -    my $jobstepid = $proc{$pid}->{jobstep};
++    my $jobstepidx = $proc{$pid}->{jobstepidx};
+     if (!WIFEXITED($childstatus))
+     {
+       # child did not exit (may be temporarily stopped)
 -      Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
++      Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
+       next;
      }
  
-     Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
-                              ++$Jobstep->{'failures'},
-                              $temporary_fail ? 'temporary' : 'permanent',
-                              $elapsed));
+     $children_reaped++;
+     my $elapsed = time - $proc{$pid}->{time};
 -    my $Jobstep = $jobstep[$jobstepid];
++    my $Jobstep = $jobstep[$jobstepidx];
+     my $exitvalue = $childstatus >> 8;
+     my $exitinfo = "exit ".exit_status_s($childstatus);
+     $Jobstep->{'arvados_task'}->reload;
+     my $task_success = $Jobstep->{'arvados_task'}->{success};
 -    Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
++    Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+     if (!defined $task_success) {
+       # task did not indicate one way or the other --> fail
 -      Log($jobstepid, sprintf(
++      Log($jobstepidx, sprintf(
+             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+             exit_status_s($childstatus)));
+       $Jobstep->{'arvados_task'}->{success} = 0;
+       $Jobstep->{'arvados_task'}->save;
+       $task_success = 0;
+     }
  
-     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
-       # Give up on this task, and the whole job
-       $main::success = 0;
+     if (!$task_success)
+     {
+       my $temporary_fail;
+       $temporary_fail ||= $Jobstep->{tempfail};
+       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+       ++$thisround_failed;
+       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+       # Check for signs of a failed or misconfigured node
+       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+         # Don't count this against jobstep failure thresholds if this
+         # node is already suspected faulty and srun exited quickly
+         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+             $elapsed < 5) {
 -          Log ($jobstepid, "blaming failure on suspect node " .
++          Log ($jobstepidx, "blaming failure on suspect node " .
+                $slot[$proc{$pid}->{slot}]->{node}->{name});
+           $temporary_fail ||= 1;
+         }
+         ban_node_by_slot($proc{$pid}->{slot});
+       }
 -      Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
 -                               ++$Jobstep->{'failures'},
 -                               $temporary_fail ? 'temporary' : 'permanent',
 -                               $elapsed));
++      Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
++                                ++$Jobstep->{'failures'},
++                                $temporary_fail ? 'temporary' : 'permanent',
++                                $elapsed));
+       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+         # Give up on this task, and the whole job
+         $main::success = 0;
+       }
+       # Put this task back on the todo queue
 -      push @jobstep_todo, $jobstepid;
++      push @jobstep_todo, $jobstepidx;
+       $Job->{'tasks_summary'}->{'failed'}++;
      }
-     # Put this task back on the todo queue
-     push @jobstep_todo, $jobstepidx;
-     $Job->{'tasks_summary'}->{'failed'}++;
-   }
-   else
-   {
-     ++$thisround_succeeded;
-     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
-     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
-     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-     push @jobstep_done, $jobstepidx;
-     Log ($jobstepidx, "success in $elapsed seconds");
-   }
-   $Jobstep->{exitcode} = $childstatus;
-   $Jobstep->{finishtime} = time;
-   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-   $Jobstep->{'arvados_task'}->save;
-   process_stderr_final ($jobstepidx);
-   Log ($jobstepidx, sprintf("task output (%d bytes): %s",
-                            length($Jobstep->{'arvados_task'}->{output}),
-                            $Jobstep->{'arvados_task'}->{output}));
-   close $reader{$jobstepidx};
-   delete $reader{$jobstepidx};
-   delete $slot[$proc{$pid}->{slot}]->{pid};
-   push @freeslot, $proc{$pid}->{slot};
-   delete $proc{$pid};
-   if ($task_success) {
-     # Load new tasks
-     my $newtask_list = [];
-     my $newtask_results;
-     do {
-       $newtask_results = api_call(
-         "job_tasks/list",
-         'where' => {
-           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-         },
-         'order' => 'qsequence',
-         'offset' => scalar(@$newtask_list),
-       );
-       push(@$newtask_list, @{$newtask_results->{items}});
-     } while (@{$newtask_results->{items}});
-     foreach my $arvados_task (@$newtask_list) {
-       my $jobstep = {
-         'level' => $arvados_task->{'sequence'},
-         'failures' => 0,
-         'arvados_task' => $arvados_task
-       };
-       push @jobstep, $jobstep;
-       push @jobstep_todo, $#jobstep;
+     else
+     {
+       ++$thisround_succeeded;
+       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
 -      push @jobstep_done, $jobstepid;
 -      Log ($jobstepid, "success in $elapsed seconds");
++      push @jobstep_done, $jobstepidx;
++      Log ($jobstepidx, "success in $elapsed seconds");
      }
 -    process_stderr ($jobstepid, $task_success);
 -    Log ($jobstepid, sprintf("task output (%d bytes): %s",
 -                             length($Jobstep->{'arvados_task'}->{output}),
 -                             $Jobstep->{'arvados_task'}->{output}));
+     $Jobstep->{exitcode} = $childstatus;
+     $Jobstep->{finishtime} = time;
+     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+     $Jobstep->{'arvados_task'}->save;
 -    close $reader{$jobstepid};
 -    delete $reader{$jobstepid};
++    process_stderr_final ($jobstepidx);
++    Log ($jobstepidx, sprintf("task output (%d bytes): %s",
++                              length($Jobstep->{'arvados_task'}->{output}),
++                              $Jobstep->{'arvados_task'}->{output}));
++    close $reader{$jobstepidx};
++    delete $reader{$jobstepidx};
+     delete $slot[$proc{$pid}->{slot}]->{pid};
+     push @freeslot, $proc{$pid}->{slot};
+     delete $proc{$pid};
+     if ($task_success) {
+       # Load new tasks
+       my $newtask_list = [];
+       my $newtask_results;
+       do {
+         $newtask_results = api_call(
+           "job_tasks/list",
+           'where' => {
+             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+           },
+           'order' => 'qsequence',
+           'offset' => scalar(@$newtask_list),
+             );
+         push(@$newtask_list, @{$newtask_results->{items}});
+       } while (@{$newtask_results->{items}});
+       foreach my $arvados_task (@$newtask_list) {
+         my $jobstep = {
+           'level' => $arvados_task->{'sequence'},
+           'failures' => 0,
+           'arvados_task' => $arvados_task
+         };
+         push @jobstep, $jobstep;
+         push @jobstep_todo, $#jobstep;
+       }
+     }
+     $progress_is_dirty = 1;
    }
  
-   $progress_is_dirty = 1;
-   1;
+   return $children_reaped;
  }
  
  sub check_refresh_wanted
  {
    my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
 -  if (@stat && $stat[9] > $latest_refresh) {
 +  if (@stat &&
 +      $stat[9] > $latest_refresh &&
 +      # ...and we have actually locked the job record...
 +      $job_id eq $Job->{'uuid'}) {
      $latest_refresh = scalar time;
      my $Job2 = api_call("jobs/get", uuid => $jobspec);
      for my $attr ('cancelled_at',
@@@ -1292,10 -1351,13 +1304,13 @@@ sub check_squeu
    # squeue check interval (15s) this should make the squeue check an
    # infrequent event.
    my $silent_procs = 0;
-   for my $procinfo (values %proc)
 -  for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
++  for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
    {
-     my $jobstep = $jobstep[$procinfo->{jobstepidx}];
-     if ($jobstep->{stderr_at} < $last_squeue_check)
+     if (!exists($js->{stderr_at}))
+     {
+       $js->{stderr_at} = 0;
+     }
+     if ($js->{stderr_at} < $last_squeue_check)
      {
        $silent_procs++;
      }
    # use killem() on procs whose killtime is reached
    while (my ($pid, $procinfo) = each %proc)
    {
-     my $jobstep = $jobstep[$procinfo->{jobstepidx}];
 -    my $js = $jobstep[$procinfo->{jobstep}];
++    my $js = $jobstep[$procinfo->{jobstepidx}];
      if (exists $procinfo->{killtime}
          && $procinfo->{killtime} <= time
-         && $jobstep->{stderr_at} < $last_squeue_check)
+         && $js->{stderr_at} < $last_squeue_check)
      {
        my $sincewhen = "";
-       if ($jobstep->{stderr_at}) {
-         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+       if ($js->{stderr_at}) {
+         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
        }
 -      Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
 +      Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
        killem ($pid);
      }
    }
    # Check for child procs >60s old and not mentioned by squeue.
    while (my ($pid, $procinfo) = each %proc)
    {
 -    my $js = $jobstep[$procinfo->{jobstep}];
      if ($procinfo->{time} < time - 60
          && $procinfo->{jobstepname}
          && !exists $ok{$procinfo->{jobstepname}}
        # error/delay has caused the task to die without notifying srun,
        # and we'll kill srun ourselves.
        $procinfo->{killtime} = time + 30;
 -      Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
 +      Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
      }
    }
  }
@@@ -1383,90 -1446,81 +1398,100 @@@ sub release_allocatio
  sub readfrompipes
  {
    my $gotsome = 0;
 -  foreach my $job (keys %reader)
+   my %fd_job;
+   my $sel = IO::Select->new();
 -    my $fd = $reader{$job};
 +  foreach my $jobstepidx (keys %reader)
+   {
 -    $fd_job{$fd} = $job;
++    my $fd = $reader{$jobstepidx};
+     $sel->add($fd);
++    $fd_job{$fd} = $jobstepidx;
++
++    if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
++      $sel->add($stdout_fd);
++      $fd_job{$stdout_fd} = $jobstepidx;
++    }
+   }
+   # select on all reader fds with 0.1s timeout
+   my @ready_fds = $sel->can_read(0.1);
+   foreach my $fd (@ready_fds)
    {
      my $buf;
-     if ($jobstep[$jobstepidx]->{stdout_r} &&
-         0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
+     if (0 < sysread ($fd, $buf, 65536))
      {
++      $gotsome = 1;
        print STDERR $buf if $ENV{CRUNCH_DEBUG};
-       if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
 -      my $job = $fd_job{$fd};
 -      $jobstep[$job]->{stderr_at} = time;
 -      $jobstep[$job]->{stderr} .= $buf;
++
++      my $jobstepidx = $fd_job{$fd};
++      if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
 +        $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
++        next;
 +      }
-       $gotsome = 1;
-     }
-     if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
-     {
-       print STDERR $buf if $ENV{CRUNCH_DEBUG};
++
 +      $jobstep[$jobstepidx]->{stderr_at} = time;
 +      $jobstep[$jobstepidx]->{stderr} .= $buf;
-       if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
-         $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
-       }
-       $gotsome = 1;
  
        # Consume everything up to the last \n
 -      preprocess_stderr ($job);
 +      preprocess_stderr ($jobstepidx);
  
 -      if (length ($jobstep[$job]->{stderr}) > 16384)
 +      if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
        {
          # If we get a lot of stderr without a newline, chop off the
          # front to avoid letting our buffer grow indefinitely.
 -        substr ($jobstep[$job]->{stderr},
 -                0, length($jobstep[$job]->{stderr}) - 8192) = "";
 +        substr ($jobstep[$jobstepidx]->{stderr},
 +                0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
        }
 -      $gotsome = 1;
      }
    }
    return $gotsome;
  }
  
  
 +# Consume all full lines of stderr for a jobstep. Everything after the
 +# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
 +# returning.
  sub preprocess_stderr
  {
 -  my $job = shift;
 +  my $jobstepidx = shift;
  
 -  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
 +  while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
      my $line = $1;
 -    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
 -    Log ($job, "stderr $line");
 +    substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
 +    Log ($jobstepidx, "stderr $line");
      if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
        # whoa.
        $main::please_freeze = 1;
      }
 -    elsif ($line =~ /srun: error: Node failure on/) {
 -      my $job_slot_index = $jobstep[$job]->{slotindex};
 +    elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
 +      # Skip the following tempfail checks if this srun proc isn't
 +      # attached to a particular worker slot.
 +    }
 +    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
 +      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
 +      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
        $slot[$job_slot_index]->{node}->{fail_count}++;
 -      $jobstep[$job]->{tempfail} = 1;
 +      $jobstep[$jobstepidx]->{tempfail} = 1;
        ban_node_by_slot($job_slot_index);
      }
      elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
 -      $jobstep[$job]->{tempfail} = 1;
 -      ban_node_by_slot($jobstep[$job]->{slotindex});
 +      $jobstep[$jobstepidx]->{tempfail} = 1;
 +      ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
      }
      elsif ($line =~ /arvados\.errors\.Keep/) {
 -      $jobstep[$job]->{tempfail} = 1;
 +      $jobstep[$jobstepidx]->{tempfail} = 1;
      }
    }
  }
  
  
 -sub process_stderr
 +sub process_stderr_final
  {
 -  my $job = shift;
 -  my $task_success = shift;
 -  preprocess_stderr ($job);
 +  my $jobstepidx = shift;
 +  preprocess_stderr ($jobstepidx);
  
    map {
 -    Log ($job, "stderr $_");
 -  } split ("\n", $jobstep[$job]->{stderr});
 +    Log ($jobstepidx, "stderr $_");
 +  } split ("\n", $jobstep[$jobstepidx]->{stderr});
 +  $jobstep[$jobstepidx]->{stderr} = '';
  }
  
  sub fetch_block
@@@ -1604,7 -1658,7 +1629,7 @@@ sub kille
      }
      if (!exists $proc{$_}->{"sent_$sig"})
      {
 -      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
 +      Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
        kill $sig, $_;
        select (undef, undef, undef, 0.1);
        if ($sig == 2)
@@@ -1728,21 -1782,16 +1753,21 @@@ sub log_writer_is_active() 
    return $log_pipe_pid;
  }
  
 -sub Log                               # ($jobstep_id, $logmessage)
 +sub Log                               # ($jobstepidx, $logmessage)
  {
 -  if ($_[1] =~ /\n/) {
 +  my ($jobstepidx, $logmessage) = @_;
 +  if ($logmessage =~ /\n/) {
      for my $line (split (/\n/, $_[1])) {
 -      Log ($_[0], $line);
 +      Log ($jobstepidx, $line);
      }
      return;
    }
    my $fh = select STDERR; $|=1; select $fh;
 -  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
 +  my $task_qseq = '';
 +  if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
 +    $task_qseq = $jobstepidx;
 +  }
 +  my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
    $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
    $message .= "\n";
    my $datetime;
@@@ -1866,83 -1915,6 +1891,83 @@@ sub freezeunquot
  }
  
  
 +sub srun_sync
 +{
 +  my $srunargs = shift;
 +  my $execargs = shift;
 +  my $opts = shift || {};
 +  my $stdin = shift;
 +
 +  my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
 +  Log (undef, "$label: start");
 +
 +  my ($stderr_r, $stderr_w);
 +  pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
 +
 +  my ($stdout_r, $stdout_w);
 +  pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
 +
 +  my $srunpid = fork();
 +  if ($srunpid == 0)
 +  {
 +    close($stderr_r);
 +    close($stdout_r);
 +    fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
 +    fcntl($stdout_w, F_SETFL, 0) or croak($!);
 +    open(STDERR, ">&", $stderr_w);
 +    open(STDOUT, ">&", $stdout_w);
 +    srun ($srunargs, $execargs, $opts, $stdin);
 +    exit (1);
 +  }
 +  close($stderr_w);
 +  close($stdout_w);
 +
 +  set_nonblocking($stderr_r);
 +  set_nonblocking($stdout_r);
 +
 +  # Add entries to @jobstep and %proc so check_squeue() and
 +  # freeze_if_want_freeze() can treat it like a job task process.
 +  push @jobstep, {
 +    stderr => '',
 +    stderr_at => 0,
 +    stderr_captured => '',
 +    stdout_r => $stdout_r,
 +    stdout_captured => '',
 +  };
 +  my $jobstepidx = $#jobstep;
 +  $proc{$srunpid} = {
 +    jobstepidx => $jobstepidx,
 +  };
 +  $reader{$jobstepidx} = $stderr_r;
 +
 +  while ($srunpid != waitpid ($srunpid, WNOHANG)) {
 +    my $busy = readfrompipes();
 +    if (!$busy || ($latest_refresh + 2 < scalar time)) {
 +      check_refresh_wanted();
 +      check_squeue();
 +    }
 +    if (!$busy) {
 +      select(undef, undef, undef, 0.1);
 +    }
 +    killem(keys %proc) if $main::please_freeze;
 +  }
 +  my $exited = $?;
 +
 +  1 while readfrompipes();
 +  process_stderr_final ($jobstepidx);
 +
 +  Log (undef, "$label: exit ".exit_status_s($exited));
 +
 +  close($stdout_r);
 +  close($stderr_r);
 +  delete $proc{$srunpid};
 +  delete $reader{$jobstepidx};
 +
 +  my $j = pop @jobstep;
 +  return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
 +}
 +
 +
  sub srun
  {
    my $srunargs = shift;