4410: crunch-job fixups from code review.
[arvados.git] / sdk / cli / bin / crunch-job
index 963bf32a953444b5ad161c984385905bcf4dd172..2246c86fb62907eb61a279cbac9d2d97c242aa1b 100755 (executable)
@@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output.
 Obtain job details from Arvados, run tasks on compute nodes (typically
 invoked by scheduler on controller):
 
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
 
 Obtain job details from command line, run tasks on local machine
 (typically invoked by application or developer on VM):
 
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
 
 =head1 OPTIONS
 
@@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway.
 
 =item --git-dir
 
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
 
 =item --job-api-token
 
@@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job
 setup. This can speed up development and debugging when running jobs
 locally.
 
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -74,17 +83,22 @@ behavior (e.g., cancel job if cancelled_at becomes non-nil).
 
 use strict;
 use POSIX ':sys_wait_h';
+use POSIX qw(strftime);
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
+use Cwd qw(realpath);
+use Data::Dumper;
 use Digest::MD5 qw(md5_hex);
 use Getopt::Long;
 use IPC::Open2;
 use IO::Select;
 use File::Temp;
 use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
 
+use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -105,27 +119,28 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
+my %proc;
 my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $docker_bin = "/usr/bin/docker.io";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'docker-bin=s' => \$docker_bin,
     );
 
 if (defined $job_api_token) {
   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
 }
 
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
-my $local_job = !$job_has_uuid;
+my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
 
 
 $SIG{'USR1'} = sub
@@ -137,47 +152,60 @@ $SIG{'USR2'} = sub
   $main::ENV{CRUNCH_DEBUG} = 0;
 };
 
-
-
 my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
-
-my $User = $arv->{'users'}->{'current'}->execute;
 
-my $Job = {};
+my $Job;
 my $job_id;
 my $dbh;
 my $sth;
-if ($job_has_uuid)
+my @jobstep;
+
+my $local_job;
+if ($jobspec =~ /^[-a-z\d]+$/)
 {
-  $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+  # $jobspec is an Arvados UUID, not a JSON job specification
+  $Job = api_call("jobs/get", uuid => $jobspec);
+  $local_job = 0;
+}
+else
+{
+  $Job = JSON::decode_json($jobspec);
+  $local_job = 1;
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if ($Job->{docker_image_locator}) {
+  $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+     $cmd,
+     {fork => 1});
+if ($? != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($?));
+  exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
   if (!$force_unlock) {
-    # If some other crunch-job process has grabbed this job (or we see
-    # other evidence that the job is already underway) we exit
-    # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
-    # mark the job as failed.
-    if ($Job->{'is_locked_by_uuid'}) {
-      Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
+    # Claim this job, and make sure nobody else does
+    eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
+    if ($@) {
+      Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
       exit EX_TEMPFAIL;
-    }
-    if ($Job->{'success'} ne undef) {
-      Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'running'}) {
-      Log(undef, "Job 'running' flag is already set");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'started_at'}) {
-      Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
-      exit EX_TEMPFAIL;
-    }
+    };
   }
 }
 else
 {
-  $Job = JSON::decode_json($jobspec);
-
   if (!$resume_stash)
   {
     map { croak ("No $_ specified") unless $Job->{$_} }
@@ -186,20 +214,29 @@ else
 
   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
   $Job->{'started_at'} = gmtime;
+  $Job->{'state'} = 'Running';
 
-  $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
-
-  $job_has_uuid = 1;
+  $Job = api_call("jobs/create", job => $Job);
 }
 $job_id = $Job->{'uuid'};
 
 my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
 
+my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
+if ($? == 0) {
+  $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
+  chomp($gem_versions);
+  chop($gem_versions);  # Closing parentheses
+} else {
+  $gem_versions = "";
+}
+Log(undef,
+    "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
 
 Log (undef, "check slurm allocation");
 my @slot;
@@ -276,25 +313,11 @@ foreach (@sinfo)
 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
 
 
-
-my $jobmanager_id;
-if ($job_has_uuid)
-{
-  # Claim this job, and make sure nobody else does
-  unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
-          $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-    Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
-    exit EX_TEMPFAIL;
-  }
-  $Job->update_attributes('started_at' => scalar gmtime,
-                          'running' => 1,
-                          'success' => undef,
-                          'tasks_summary' => { 'failed' => 0,
-                                               'todo' => 1,
-                                               'running' => 0,
-                                               'done' => 0 });
-}
-
+$Job->update_attributes(
+  'tasks_summary' => { 'failed' => 0,
+                       'todo' => 1,
+                       'running' => 0,
+                       'done' => 0 });
 
 Log (undef, "start");
 $SIG{'INT'} = sub { $main::please_freeze = 1; };
@@ -316,14 +339,11 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id;
 $ENV{"JOB_UUID"} = $job_id;
 
 
-my @jobstep;
 my @jobstep_todo = ();
 my @jobstep_done = ();
 my @jobstep_tomerge = ();
 my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
-my $output_in_keep = 0;
+my $squeue_checked = 0;
 my $latest_refresh = scalar time;
 
 
@@ -334,12 +354,12 @@ if (defined $Job->{thawedfromkey})
 }
 else
 {
-  my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+  my $first_task = api_call("job_tasks/create", job_task => {
     'job_uuid' => $Job->{'uuid'},
     'sequence' => 0,
     'qsequence' => 0,
     'parameters' => {},
-                                                          });
+  });
   push @jobstep, { 'level' => 0,
                   'failures' => 0,
                    'arvados_task' => $first_task,
@@ -353,166 +373,36 @@ if (!$have_slurm)
   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
 }
 
+my $build_script = handle_readall(\*DATA);
+my $nodelist = join(",", @node);
+my $git_tar_count = 0;
 
-my $build_script;
+if (!defined $no_clear_tmp) {
+  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
+  Log (undef, "Clean work dirs");
 
-
-$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-
-my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
-if ($skip_install)
-{
-  if (!defined $no_clear_tmp) {
-    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
-    system($clear_tmp_cmd) == 0
-       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
-  }
-  $ENV{"CRUNCH_SRC"} = $Job->{script_version};
-  for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
-    if (-d $src_path) {
-      system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
-          or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
-      system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
-          == 0
-          or croak ("setup.py in $src_path failed: exit ".($?>>8));
-    }
-  }
-}
-else
-{
-  do {
-    local $/ = undef;
-    $build_script = <DATA>;
-  };
-  Log (undef, "Install revision ".$Job->{script_version});
-  my $nodelist = join(",", @node);
-
-  if (!defined $no_clear_tmp) {
-    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
-
-    my $cleanpid = fork();
-    if ($cleanpid == 0)
-    {
-      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
-      exit (1);
-    }
-    while (1)
-    {
-      last if $cleanpid == waitpid (-1, WNOHANG);
-      freeze_if_want_freeze ($cleanpid);
-      select (undef, undef, undef, 0.1);
-    }
-    Log (undef, "Clean-work-dir exited $?");
-  }
-
-  # Install requested code version
-
-  my @execargs;
-  my @srunargs = ("srun",
-                 "--nodelist=$nodelist",
-                 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-
-  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-  $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
-
-  my $commit;
-  my $git_archive;
-  my $treeish = $Job->{'script_version'};
-
-  # If we're running under crunch-dispatch, it will have pulled the
-  # appropriate source tree into its own repository, and given us that
-  # repo's path as $git_dir. If we're running a "local" job, and a
-  # script_version was specified, it's up to the user to provide the
-  # full path to a local repository in Job->{repository}.
-  #
-  # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
-  # git-archive --remote where appropriate.
-  #
-  # TODO: Accept a locally-hosted Arvados repository by name or
-  # UUID. Use arvados.v1.repositories.list or .get to figure out the
-  # appropriate fetch-url.
-  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
-
-  $ENV{"CRUNCH_SRC_URL"} = $repo;
-
-  if (-d "$repo/.git") {
-    # We were given a working directory, but we are only interested in
-    # the index.
-    $repo = "$repo/.git";
-  }
-
-  # If this looks like a subversion r#, look for it in git-svn commit messages
-
-  if ($treeish =~ m{^\d{1,4}$}) {
-    my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
-    chomp $gitlog;
-    if ($gitlog =~ /^[a-f0-9]{40}$/) {
-      $commit = $gitlog;
-      Log (undef, "Using commit $commit for script_version $treeish");
-    }
-  }
-
-  # If that didn't work, try asking git to look it up as a tree-ish.
-
-  if (!defined $commit) {
-    my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
-    chomp $found;
-    if ($found =~ /^[0-9a-f]{40}$/s) {
-      $commit = $found;
-      if ($commit ne $treeish) {
-       # Make sure we record the real commit id in the database,
-       # frozentokey, logs, etc. -- instead of an abbreviation or a
-       # branch name which can become ambiguous or point to a
-       # different commit in the future.
-       $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-       Log (undef, "Using commit $commit for tree-ish $treeish");
-        if ($commit ne $treeish) {
-          $Job->{'script_version'} = $commit;
-          !$job_has_uuid or
-              $Job->update_attributes('script_version' => $commit) or
-              croak("Error while updating job");
-        }
-      }
-    }
-  }
-
-  if (defined $commit) {
-    $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-    @execargs = ("sh", "-c",
-                "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
-  }
-  else {
-    croak ("could not figure out commit id for $treeish");
-  }
-
-  # Note: this section is almost certainly unnecessary if we're
-  # running tasks in docker containers.
-  my $installpid = fork();
-  if ($installpid == 0)
+  my $cleanpid = fork();
+  if ($cleanpid == 0)
   {
-    srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+    # Find FUSE mounts that look like Keep mounts (the mount path has the
+    # word "keep") and unmount them.  Then clean up work directories.
+    # TODO: When #5036 is done and widely deployed, we can get rid of the
+    # regular expression and just unmount everything with type fuse.keep.
+    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
     exit (1);
   }
   while (1)
   {
-    last if $installpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($installpid);
+    last if $cleanpid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($cleanpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Install exited $?");
-}
-
-if (!$have_slurm)
-{
-  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
-  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+  Log (undef, "Cleanup command exited ".exit_status_s($?));
 }
 
 # If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
@@ -521,7 +411,7 @@ if ($docker_locator = $Job->{docker_image_locator}) {
   }
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
@@ -540,7 +430,273 @@ fi
   }
   if ($? != 0)
   {
-    croak("Installing Docker image from $docker_locator returned exit code $?");
+    croak("Installing Docker image from $docker_locator exited "
+          .exit_status_s($?));
+  }
+
+  # Determine whether this version of Docker supports memory+swap limits.
+  srun(["srun", "--nodelist=" . $node[0]],
+       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
+      {fork => 1});
+  $docker_limitmem = ($? == 0);
+
+  if ($Job->{arvados_sdk_version}) {
+    # The job also specifies an Arvados SDK version.  Add the SDKs to the
+    # tar file for the build script to install.
+    Log(undef, sprintf("Packing Arvados SDK version %s for installation",
+                       $Job->{arvados_sdk_version}));
+    add_git_archive("git", "--git-dir=$git_dir", "archive",
+                    "--prefix=.arvados.sdk/",
+                    $Job->{arvados_sdk_version}, "sdk");
+  }
+}
+
+if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
+  # If script_version looks like an absolute path, *and* the --git-dir
+  # argument was not given -- which implies we were not invoked by
+  # crunch-dispatch -- we will use the given path as a working
+  # directory instead of resolving script_version to a git commit (or
+  # doing anything else with git).
+  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
+  $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
+}
+else {
+  # Resolve the given script_version to a git commit sha1. Also, if
+  # the repository is remote, clone it into our local filesystem: this
+  # ensures "git archive" will work, and is necessary to reliably
+  # resolve a symbolic script_version like "master^".
+  $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
+
+  Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
+
+  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
+
+  # If we're running under crunch-dispatch, it will have already
+  # pulled the appropriate source tree into its own repository, and
+  # given us that repo's path as $git_dir.
+  #
+  # If we're running a "local" job, we might have to fetch content
+  # from a remote repository.
+  #
+  # (Currently crunch-dispatch gives a local path with --git-dir, but
+  # we might as well accept URLs there too in case it changes its
+  # mind.)
+  my $repo = $git_dir || $Job->{'repository'};
+
+  # Repository can be remote or local. If remote, we'll need to fetch it
+  # to a local dir before doing `git log` et al.
+  my $repo_location;
+
+  if ($repo =~ m{://|^[^/]*:}) {
+    # $repo is a git url we can clone, like git:// or https:// or
+    # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
+    # not recognized here because distinguishing that from a local
+    # path is too fragile. If you really need something strange here,
+    # use the ssh:// form.
+    $repo_location = 'remote';
+  } elsif ($repo =~ m{^\.*/}) {
+    # $repo is a local path to a git index. We'll also resolve ../foo
+    # to ../foo/.git if the latter is a directory. To help
+    # disambiguate local paths from named hosted repositories, this
+    # form must be given as ./ or ../ if it's a relative path.
+    if (-d "$repo/.git") {
+      $repo = "$repo/.git";
+    }
+    $repo_location = 'local';
+  } else {
+    # $repo is none of the above. It must be the name of a hosted
+    # repository.
+    my $arv_repo_list = api_call("repositories/list",
+                                 'filters' => [['name','=',$repo]]);
+    my @repos_found = @{$arv_repo_list->{'items'}};
+    my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
+    if ($n_found > 0) {
+      Log(undef, "Repository '$repo' -> "
+          . join(", ", map { $_->{'uuid'} } @repos_found));
+    }
+    if ($n_found != 1) {
+      croak("Error: Found $n_found repositories with name '$repo'.");
+    }
+    $repo = $repos_found[0]->{'fetch_url'};
+    $repo_location = 'remote';
+  }
+  Log(undef, "Using $repo_location repository '$repo'");
+  $ENV{"CRUNCH_SRC_URL"} = $repo;
+
+  # Resolve given script_version (we'll call that $treeish here) to a
+  # commit sha1 ($commit).
+  my $treeish = $Job->{'script_version'};
+  my $commit;
+  if ($repo_location eq 'remote') {
+    # We minimize excess object-fetching by re-using the same bare
+    # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
+    # just keep adding remotes to it as needed.
+    my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
+    my $gitcmd = "git --git-dir=\Q$local_repo\E";
+
+    # Set up our local repo for caching remote objects, making
+    # archives, etc.
+    if (!-d $local_repo) {
+      make_path($local_repo) or croak("Error: could not create $local_repo");
+    }
+    # This works (exits 0 and doesn't delete fetched objects) even
+    # if $local_repo is already initialized:
+    `$gitcmd init --bare`;
+    if ($?) {
+      croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
+    }
+
+    # If $treeish looks like a hash (or abbrev hash) we look it up in
+    # our local cache first, since that's cheaper. (We don't want to
+    # do that with tags/branches though -- those change over time, so
+    # they should always be resolved by the remote repo.)
+    if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
+      # Hide stderr because it's normal for this to fail:
+      my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
+      if ($? == 0 &&
+          # Careful not to resolve a branch named abcdeff to commit 1234567:
+          $sha1 =~ /^$treeish/ &&
+          $sha1 =~ /^([0-9a-f]{40})$/s) {
+        $commit = $1;
+        Log(undef, "Commit $commit already present in $local_repo");
+      }
+    }
+
+    if (!defined $commit) {
+      # If $treeish isn't just a hash or abbrev hash, or isn't here
+      # yet, we need to fetch the remote to resolve it correctly.
+
+      # First, remove all local heads. This prevents a name that does
+      # not exist on the remote from resolving to (or colliding with)
+      # a previously fetched branch or tag (possibly from a different
+      # remote).
+      remove_tree("$local_repo/refs/heads", {keep_root => 1});
+
+      Log(undef, "Fetching objects from $repo to $local_repo");
+      `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
+      if ($?) {
+        croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
+      }
+    }
+
+    # Now that the data is all here, we will use our local repo for
+    # the rest of our git activities.
+    $repo = $local_repo;
+  }
+
+  my $gitcmd = "git --git-dir=\Q$repo\E";
+  my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
+  unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
+    croak("`$gitcmd rev-list` exited "
+          .exit_status_s($?)
+          .", '$treeish' not found. Giving up.");
+  }
+  $commit = $1;
+  Log(undef, "Version $treeish is commit $commit");
+
+  if ($commit ne $Job->{'script_version'}) {
+    # Record the real commit id in the database, frozentokey, logs,
+    # etc. -- instead of an abbreviation or a branch name which can
+    # become ambiguous or point to a different commit in the future.
+    if (!$Job->update_attributes('script_version' => $commit)) {
+      croak("Error: failed to update job's script_version attribute");
+    }
+  }
+
+  $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
+  add_git_archive("$gitcmd archive ''\Q$commit\E");
+}
+
+my $git_archive = combined_git_archive();
+if (!defined $git_archive) {
+  Log(undef, "Skip install phase (no git archive)");
+  if ($have_slurm) {
+    Log(undef, "Warning: This probably means workers have no source tree!");
+  }
+}
+else {
+  my $install_exited;
+  my $install_script_tries_left = 3;
+  for (my $attempts = 0; $attempts < 3; $attempts++) {
+    Log(undef, "Run install script on all workers");
+
+    my @srunargs = ("srun",
+                    "--nodelist=$nodelist",
+                    "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+    my @execargs = ("sh", "-c",
+                    "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+    $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+    my ($install_stderr_r, $install_stderr_w);
+    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+    set_nonblocking($install_stderr_r);
+    my $installpid = fork();
+    if ($installpid == 0)
+    {
+      close($install_stderr_r);
+      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+      open(STDOUT, ">&", $install_stderr_w);
+      open(STDERR, ">&", $install_stderr_w);
+      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+      exit (1);
+    }
+    close($install_stderr_w);
+    # Tell freeze_if_want_freeze how to kill the child, otherwise the
+    # "waitpid(installpid)" loop won't get interrupted by a freeze:
+    $proc{$installpid} = {};
+    my $stderr_buf = '';
+    # Track whether anything appears on stderr other than slurm errors
+    # ("srun: ...") and the "starting: ..." message printed by the
+    # srun subroutine itself:
+    my $stderr_anything_from_script = 0;
+    my $match_our_own_errors = '^(srun: error: |starting: \[)';
+    while ($installpid != waitpid(-1, WNOHANG)) {
+      freeze_if_want_freeze ($installpid);
+      # Wait up to 0.1 seconds for something to appear on stderr, then
+      # do a non-blocking read.
+      my $bits = fhbits($install_stderr_r);
+      select ($bits, undef, $bits, 0.1);
+      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+      {
+        while ($stderr_buf =~ /^(.*?)\n/) {
+          my $line = $1;
+          substr $stderr_buf, 0, 1+length($line), "";
+          Log(undef, "stderr $line");
+          if ($line !~ /$match_our_own_errors/) {
+            $stderr_anything_from_script = 1;
+          }
+        }
+      }
+    }
+    delete $proc{$installpid};
+    $install_exited = $?;
+    close($install_stderr_r);
+    if (length($stderr_buf) > 0) {
+      if ($stderr_buf !~ /$match_our_own_errors/) {
+        $stderr_anything_from_script = 1;
+      }
+      Log(undef, "stderr $stderr_buf")
+    }
+
+    Log (undef, "Install script exited ".exit_status_s($install_exited));
+    last if $install_exited == 0 || $main::please_freeze;
+    # If the install script fails but doesn't print an error message,
+    # the next thing anyone is likely to do is just run it again in
+    # case it was a transient problem like "slurm communication fails
+    # because the network isn't reliable enough". So we'll just do
+    # that ourselves (up to 3 attempts in total). OTOH, if there is an
+    # error message, the problem is more likely to have a real fix and
+    # we should fail the job so the fixing process can start, instead
+    # of doing 2 more attempts.
+    last if $stderr_anything_from_script;
+  }
+
+  foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+    unlink($tar_filename);
+  }
+
+  if ($install_exited != 0) {
+    croak("Giving up");
   }
 }
 
@@ -566,16 +722,48 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
 my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
 
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+  $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
 
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers.  Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level.  This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+  @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+  @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+  @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
+
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+  my $this_slot = $slot[$freeslot[$ii]];
+  my $node_name = $this_slot->{node}->{name};
+  $round_max_slots{$node_name} ||= $this_slot->{cpu};
+  last if (scalar(keys(%round_max_slots)) >= @node);
+}
 
-my %proc;
-my @freeslot = (0..$#slot);
+Log(undef, "start level $level with $round_num_freeslots slots");
 my @holdslot;
 my %reader;
 my $progress_is_dirty = 1;
@@ -584,7 +772,6 @@ my $progress_stats_updated = 0;
 update_progress_stats();
 
 
-
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
@@ -595,15 +782,15 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
 
-  pipe $reader{$id}, "writer" or croak ($!);
-  my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
-  fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+  pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+  set_nonblocking($reader{$id});
 
   my $childslot = $freeslot[0];
   my $childnode = $slot[$childslot]->{node};
   my $childslotname = join (".",
                            $slot[$childslot]->{node}->{name},
                            $slot[$childslot]->{cpu});
+
   my $childpid = fork();
   if ($childpid == 0)
   {
@@ -633,11 +820,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+    $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
     $ENV{"HOME"} = $ENV{"TASK_WORK"};
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-    $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
     $ENV{"GZIP"} = "-n";
@@ -648,58 +835,99 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
       "--job-name=$job_id.$id.$$",
        );
-    my $build_script_to_send = "";
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
-       ."&& cd $ENV{CRUNCH_TMP} ";
-    if ($build_script)
-    {
-      $build_script_to_send = $build_script;
-    }
-    $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+       ."&& cd $ENV{CRUNCH_TMP} "
+        # These environment variables get used explicitly later in
+        # $command.  No tool is expected to read these values directly.
+        .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
+        .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
+        ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
+        ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
+    $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
-      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
+      my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
+      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+      # We only set memory limits if Docker lets us limit both memory and swap.
+      # Memory limits alone have been supported longer, but subprocesses tend
+      # to get SIGKILL if they exceed that without any swap limit set.
+      # See #5642 for additional background.
+      if ($docker_limitmem) {
+        $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
+      }
+
       # Dynamically configure the container to use the host system as its
       # DNS server.  Get the host's global addresses from the ip command,
       # and turn them into docker --dns options using gawk.
       $command .=
           q{$(ip -o address show scope global |
               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
+
+      # The source tree and $destdir directory (which we have
+      # installed on the worker host) are available in the container,
+      # under the same path.
+      $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
+      $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
+
+      # Currently, we make arv-mount's mount point appear at /keep
+      # inside the container (instead of using the same path as the
+      # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
+      # crunch scripts and utilities must not rely on this. They must
+      # use $TASK_KEEPMOUNT.
       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
-      $command .= "--env=\QHOME=/home/crunch\E ";
+      $ENV{TASK_KEEPMOUNT} = "/keep";
+
+      # TASK_WORK is almost exactly like a docker data volume: it
+      # starts out empty, is writable, and persists until no
+      # containers use it any more. We don't use --volumes-from to
+      # share it with other containers: it is only accessible to this
+      # task, and it goes away when this task stops.
+      #
+      # However, a docker data volume is writable only by root unless
+      # the mount point already happens to exist in the container with
+      # different permissions. Therefore, we [1] assume /tmp already
+      # exists in the image and is writable by the crunch user; [2]
+      # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
+      # writable if they are created by docker while setting up the
+      # other --volumes); and [3] create $TASK_WORK inside the
+      # container using $build_script.
+      $command .= "--volume=/tmp ";
+      $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
+      $ENV{"HOME"} = $ENV{"TASK_WORK"};
+      $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
+
+      # TODO: Share a single JOB_WORK volume across all task
+      # containers on a given worker node, and delete it when the job
+      # ends (and, in case that doesn't work, when the next job
+      # starts).
+      #
+      # For now, use the same approach as TASK_WORK above.
+      $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+
       while (my ($env_key, $env_val) = each %ENV)
       {
-        if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
-          if ($env_key eq "TASK_KEEPMOUNT") {
-            $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
-          }
-          else {
-            $command .= "--env=\Q$env_key=$env_val\E ";
-          }
+        if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
+          $command .= "--env=\Q$env_key=$env_val\E ";
         }
       }
-      $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
-      $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
+      $command .= "--env=\QHOME=$ENV{HOME}\E ";
       $command .= "\Q$docker_hash\E ";
       $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "perl - ";
-      $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
+      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     } else {
       # Non-docker run
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
       $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "perl - ";
-      $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
 
     my @execargs = ('bash', '-c', $command);
-    srun (\@srunargs, \@execargs, undef, $build_script_to_send);
+    srun (\@srunargs, \@execargs, undef, $build_script);
     # exec() failed, we assume nothing happened.
-    Log(undef, "srun() failed on build script");
-    die;
+    die "srun() failed on build script\n";
   }
   close("writer");
   if (!defined $childpid)
@@ -725,6 +953,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   delete $Jobstep->{stderr};
   delete $Jobstep->{finishtime};
 
+  $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
+  $Jobstep->{'arvados_task'}->save;
+
   splice @jobstep_todo, $todo_ptr, 1;
   --$todo_ptr;
 
@@ -732,14 +963,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
   while (!@freeslot
         ||
-        (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+        ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
-    last THISROUND if $main::please_freeze;
+    last THISROUND if $main::please_freeze || defined($main::success);
     if ($main::please_info)
     {
       $main::please_info = 0;
       freeze();
-      collate_output();
+      create_output_collection();
       save_meta(1);
       update_progress_stats();
     }
@@ -757,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
        ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -780,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -801,7 +1032,7 @@ while (%proc)
     $main::please_continue = 0;
     goto THISROUND;
   }
-  $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+  $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
   readfrompipes ();
   if (!reapchildren())
   {
@@ -819,18 +1050,18 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_output_collection();
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -838,41 +1069,20 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &collate_output();
+my $collated_output = save_output_collection();
+Log (undef, "finish");
 
-if ($job_has_uuid) {
-  $Job->update_attributes('running' => 0,
-                          'success' => $collated_output && $main::success,
-                          'finished_at' => scalar gmtime)
-}
+save_meta();
 
-if (!$collated_output) {
-  Log(undef, "output undef");
-}
-else {
-  eval {
-    open(my $orig_manifest, '-|', 'arv-get', $collated_output)
-        or die "failed to get collated manifest: $!";
-    my $orig_manifest_text = '';
-    while (my $manifest_line = <$orig_manifest>) {
-      $orig_manifest_text .= $manifest_line;
-    }
-    my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
-      'manifest_text' => $orig_manifest_text,
-    });
-    Log(undef, "output uuid " . $output->{uuid});
-    Log(undef, "output hash " . $output->{portable_data_hash});
-    $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
-  };
-  if ($@) {
-    Log (undef, "Failed to register output manifest: $@");
-  }
+my $final_state;
+if ($collated_output && $main::success) {
+  $final_state = 'Complete';
+} else {
+  $final_state = 'Failed';
 }
+$Job->update_attributes('state' => $final_state);
 
-Log (undef, "finish");
-
-save_meta();
-exit ($Job->{'success'} ? 1 : 0);
+exit (($final_state eq 'Complete') ? 0 : 1);
 
 
 
@@ -887,9 +1097,7 @@ sub update_progress_stats
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
-  if ($job_has_uuid) {
-    $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
-  }
+  $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
@@ -910,10 +1118,7 @@ sub reapchildren
 
   my $childstatus = $?;
   my $exitvalue = $childstatus >> 8;
-  my $exitinfo = sprintf("exit %d signal %d%s",
-                         $exitvalue,
-                         $childstatus & 127,
-                         ($childstatus & 128 ? ' core dump' : ''));
+  my $exitinfo = "exit ".exit_status_s($childstatus);
   $Jobstep->{'arvados_task'}->reload;
   my $task_success = $Jobstep->{'arvados_task'}->{success};
 
@@ -930,7 +1135,7 @@ sub reapchildren
   {
     my $temporary_fail;
     $temporary_fail ||= $Jobstep->{node_fail};
-    $temporary_fail ||= ($exitvalue == 111);
+    $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
@@ -951,18 +1156,15 @@ sub reapchildren
 
     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
                              ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $temporary_fail ? 'temporary' : 'permanent',
                              $elapsed));
 
     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
       # Give up on this task, and the whole job
       $main::success = 0;
-      $main::please_freeze = 1;
-    }
-    else {
-      # Put this task back on the todo queue
-      push @jobstep_todo, $jobstepid;
     }
+    # Put this task back on the todo queue
+    push @jobstep_todo, $jobstepid;
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
@@ -975,8 +1177,12 @@ sub reapchildren
   }
   $Jobstep->{exitcode} = $childstatus;
   $Jobstep->{finishtime} = time;
+  $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+  $Jobstep->{'arvados_task'}->save;
   process_stderr ($jobstepid, $task_success);
-  Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+  Log ($jobstepid, sprintf("task output (%d bytes): %s",
+                           length($Jobstep->{'arvados_task'}->{output}),
+                           $Jobstep->{'arvados_task'}->{output}));
 
   close $reader{$jobstepid};
   delete $reader{$jobstepid};
@@ -989,7 +1195,8 @@ sub reapchildren
     my $newtask_list = [];
     my $newtask_results;
     do {
-      $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
+      $newtask_results = api_call(
+        "job_tasks/list",
         'where' => {
           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
         },
@@ -1018,48 +1225,66 @@ sub check_refresh_wanted
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
   if (@stat && $stat[9] > $latest_refresh) {
     $latest_refresh = scalar time;
-    if ($job_has_uuid) {
-      my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
-      for my $attr ('cancelled_at',
-                    'cancelled_by_user_uuid',
-                    'cancelled_by_client_uuid') {
-        $Job->{$attr} = $Job2->{$attr};
-      }
-      if ($Job->{'cancelled_at'}) {
-        Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
-             " by user " . $Job->{cancelled_by_user_uuid});
-        $main::success = 0;
-        $main::please_freeze = 1;
+    my $Job2 = api_call("jobs/get", uuid => $jobspec);
+    for my $attr ('cancelled_at',
+                  'cancelled_by_user_uuid',
+                  'cancelled_by_client_uuid',
+                  'state') {
+      $Job->{$attr} = $Job2->{$attr};
+    }
+    if ($Job->{'state'} ne "Running") {
+      if ($Job->{'state'} eq "Cancelled") {
+        Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
+      } else {
+        Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
       }
+      $main::success = 0;
+      $main::please_freeze = 1;
     }
   }
 }
 
 sub check_squeue
 {
-  # return if the kill list was checked <4 seconds ago
-  if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
-  {
-    return;
-  }
-  $squeue_kill_checked = time;
+  my $last_squeue_check = $squeue_checked;
 
-  # use killem() on procs whose killtime is reached
-  for (keys %proc)
+  # Do not call `squeue` or check the kill list more than once every
+  # 15 seconds.
+  return if $last_squeue_check > time - 15;
+  $squeue_checked = time;
+
+  # Look for children from which we haven't received stderr data since
+  # the last squeue check. If no such children exist, all procs are
+  # alive and there's no need to even look at squeue.
+  #
+  # As long as the crunchstat poll interval (10s) is shorter than the
+  # squeue check interval (15s) this should make the squeue check an
+  # infrequent event.
+  my $silent_procs = 0;
+  for my $jobstep (values %proc)
   {
-    if (exists $proc{$_}->{killtime}
-       && $proc{$_}->{killtime} <= time)
+    if ($jobstep->{stderr_at} < $last_squeue_check)
     {
-      killem ($_);
+      $silent_procs++;
     }
   }
+  return if $silent_procs == 0;
 
-  # return if the squeue was checked <60 seconds ago
-  if (defined $squeue_checked && $squeue_checked > time - 60)
+  # use killem() on procs whose killtime is reached
+  while (my ($pid, $jobstep) = each %proc)
   {
-    return;
+    if (exists $jobstep->{killtime}
+        && $jobstep->{killtime} <= time
+        && $jobstep->{stderr_at} < $last_squeue_check)
+    {
+      my $sincewhen = "";
+      if ($jobstep->{stderr_at}) {
+        $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+      }
+      Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      killem ($pid);
+    }
   }
-  $squeue_checked = time;
 
   if (!$have_slurm)
   {
@@ -1067,37 +1292,46 @@ sub check_squeue
     return;
   }
 
-  # get a list of steps still running
-  my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
-  chop @squeue;
-  if ($squeue[-1] ne "ok")
+  # Get a list of steps still running.  Note: squeue(1) says --steps
+  # selects a format (which we override anyway) and allows us to
+  # specify which steps we're interested in (which we don't).
+  # Importantly, it also changes the meaning of %j from "job name" to
+  # "step name" and (although this isn't mentioned explicitly in the
+  # docs) switches from "one line per job" mode to "one line per step"
+  # mode. Without it, we'd just get a list of one job, instead of a
+  # list of N steps.
+  my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
+  if ($? != 0)
   {
+    Log(undef, "warning: squeue exit status $? ($!)");
     return;
   }
-  pop @squeue;
+  chop @squeue;
 
   # which of my jobsteps are running, according to squeue?
   my %ok;
-  foreach (@squeue)
+  for my $jobstepname (@squeue)
   {
-    if (/^(\d+)\.(\d+) (\S+)/)
-    {
-      if ($1 eq $ENV{SLURM_JOBID})
-      {
-       $ok{$3} = 1;
-      }
-    }
+    $ok{$jobstepname} = 1;
   }
 
-  # which of my active child procs (>60s old) were not mentioned by squeue?
-  foreach (keys %proc)
+  # Check for child procs >60s old and not mentioned by squeue.
+  while (my ($pid, $jobstep) = each %proc)
   {
-    if ($proc{$_}->{time} < time - 60
-       && !exists $ok{$proc{$_}->{jobstepname}}
-       && !exists $proc{$_}->{killtime})
+    if ($jobstep->{time} < time - 60
+        && $jobstep->{jobstepname}
+        && !exists $ok{$jobstep->{jobstepname}}
+        && !exists $jobstep->{killtime})
     {
-      # kill this proc if it hasn't exited in 30 seconds
-      $proc{$_}->{killtime} = time + 30;
+      # According to slurm, this task has ended (successfully or not)
+      # -- but our srun child hasn't exited. First we must wait (30
+      # seconds) in case this is just a race between communication
+      # channels. Then, if our srun child process still hasn't
+      # terminated, we'll conclude some slurm communication
+      # error/delay has caused the task to die without notifying srun,
+      # and we'll kill srun ourselves.
+      $jobstep->{killtime} = time + 30;
+      Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
@@ -1108,7 +1342,7 @@ sub release_allocation
   if ($have_slurm)
   {
     Log (undef, "release job allocation");
-    system "scancel $ENV{SLURM_JOBID}";
+    system "scancel $ENV{SLURM_JOB_ID}";
   }
 }
 
@@ -1122,6 +1356,7 @@ sub readfrompipes
     while (0 < sysread ($reader{$job}, $buf, 8192))
     {
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
+      $jobstep[$job]->{stderr_at} = time;
       $jobstep[$job]->{stderr} .= $buf;
       preprocess_stderr ($job);
       if (length ($jobstep[$job]->{stderr}) > 16384)
@@ -1147,7 +1382,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+    elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
       $jobstep[$job]->{node_fail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
@@ -1169,16 +1404,19 @@ sub process_stderr
 sub fetch_block
 {
   my $hash = shift;
-  my ($keep, $child_out, $output_block);
-
-  my $cmd = "arv-get \Q$hash\E";
-  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
-  $output_block = '';
+  my $keep;
+  if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+    Log(undef, "fetch_block run error from arv-get $hash: $!");
+    return undef;
+  }
+  my $output_block = "";
   while (1) {
     my $buf;
     my $bytes = sysread($keep, $buf, 1024 * 1024);
     if (!defined $bytes) {
-      die "reading from arv-get: $!";
+      Log(undef, "fetch_block read error from arv-get: $!");
+      $output_block = undef;
+      last;
     } elsif ($bytes == 0) {
       # sysread returns 0 at the end of the pipe.
       last;
@@ -1188,58 +1426,98 @@ sub fetch_block
     }
   }
   close $keep;
+  if ($?) {
+    Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+    $output_block = undef;
+  }
   return $output_block;
 }
 
-sub collate_output
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
+sub create_output_collection
 {
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
-  my $joboutput;
+  my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+       create(body={"manifest_text": sys.stdin.read()}).
+       execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
+
+  my $task_idx = -1;
+  my $manifest_size = 0;
   for (@jobstep)
   {
-    next if (!exists $_->{'arvados_task'}->{'output'} ||
-             !$_->{'arvados_task'}->{'success'});
+    ++$task_idx;
     my $output = $_->{'arvados_task'}->{output};
-    if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
-    {
-      $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
-      print $child_in $output;
-    }
-    elsif (@jobstep == 1)
-    {
-      $joboutput = $output;
-      last;
-    }
-    elsif (defined (my $outblock = fetch_block ($output)))
-    {
-      $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
-      print $child_in $outblock;
+    next if (!defined($output));
+    my $next_write;
+    if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+      $next_write = fetch_block($output);
+    } else {
+      $next_write = $output;
     }
-    else
-    {
-      Log (undef, "XXX fetch_block($output) failed XXX");
+    if (defined($next_write)) {
+      if (!defined(syswrite($child_in, $next_write))) {
+        # There's been an error writing.  Stop the loop.
+        # We'll log details about the exit code later.
+        last;
+      } else {
+        $manifest_size += length($next_write);
+      }
+    } else {
+      my $uuid = $_->{'arvados_task'}->{'uuid'};
+      Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
       $main::success = 0;
     }
   }
-  $child_in->close;
+  close($child_in);
+  Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
 
-  if (!defined $joboutput) {
-    my $s = IO::Select->new($child_out);
-    if ($s->can_read(120)) {
-      sysread($child_out, $joboutput, 64 * 1024 * 1024);
-      chomp($joboutput);
+  my $joboutput;
+  my $s = IO::Select->new($child_out);
+  if ($s->can_read(120)) {
+    sysread($child_out, $joboutput, 1024 * 1024);
+    waitpid($pid, 0);
+    if ($?) {
+      Log(undef, "output collection creation exited " . exit_status_s($?));
+      $joboutput = undef;
     } else {
-      Log (undef, "timed out reading from 'arv-put'");
+      chomp($joboutput);
+    }
+  } else {
+    Log (undef, "timed out while creating output collection");
+    foreach my $signal (2, 2, 2, 15, 15, 9) {
+      kill($signal, $pid);
+      last if waitpid($pid, WNOHANG) == -1;
+      sleep(1);
     }
   }
-  waitpid($pid, 0);
+  close($child_out);
 
   return $joboutput;
 }
 
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+  my $collated_output = create_output_collection();
+
+  if (!$collated_output) {
+    Log(undef, "Failed to write output collection");
+  }
+  else {
+    Log(undef, "job output $collated_output");
+    $Job->update_attributes('output' => $collated_output);
+  }
+  return $collated_output;
+}
 
 sub killem
 {
@@ -1282,6 +1560,102 @@ sub fhbits
 }
 
 
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+#     Starts an arv-put pipe, reading data on stdin and writing it to
+#     a $logfilename file in an output collection.
+#
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
+# log_writer_send($txt)
+#     Writes $txt to the output log collection.
+#
+# log_writer_finish()
+#     Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+#     Returns a true value if there is currently a live arv-put
+#     process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
+
+sub log_writer_start($)
+{
+  my $logfilename = shift;
+  $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+                        'arv-put',
+                        '--stream',
+                        '--retries', '3',
+                        '--filename', $logfilename,
+                        '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
+}
+
+sub log_writer_send($)
+{
+  my $txt = shift;
+  print $log_pipe_in $txt;
+  log_writer_read_output();
+}
+
+sub log_writer_finish()
+{
+  return unless $log_pipe_pid;
+
+  close($log_pipe_in);
+
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
+    Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
+  }
+
+  waitpid($log_pipe_pid, 0);
+  if ($?) {
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
+  }
+
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
+  return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+  return $log_pipe_pid;
+}
+
 sub Log                                # ($jobstep_id, $logmessage)
 {
   if ($_[1] =~ /\n/) {
@@ -1295,15 +1669,15 @@ sub Log                         # ($jobstep_id, $logmessage)
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
-  if ($local_logfile || -t STDERR) {
+  if (log_writer_is_active() || -t STDERR) {
     my @gmtime = gmtime;
     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
                         $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  if ($local_logfile) {
-    print $local_logfile $datetime . " " . $message;
+  if (log_writer_is_active()) {
+    log_writer_send($datetime . " " . $message);
   }
 }
 
@@ -1314,19 +1688,21 @@ sub croak
   my $message = "@_ at $file line $line\n";
   Log (undef, $message);
   freeze() if @jobstep_todo;
-  collate_output() if @jobstep_todo;
+  create_output_collection() if @jobstep_todo;
   cleanup();
-  save_meta() if $local_logfile;
+  save_meta();
   die;
 }
 
 
 sub cleanup
 {
-  return if !$job_has_uuid;
-  $Job->update_attributes('running' => 0,
-                          'success' => 0,
-                          'finished_at' => scalar gmtime);
+  return unless $Job;
+  if ($Job->{'state'} eq 'Cancelled') {
+    $Job->update_attributes('finished_at' => scalar gmtime);
+  } else {
+    $Job->update_attributes('state' => 'Failed');
+  }
 }
 
 
@@ -1334,18 +1710,23 @@ sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+  return unless log_writer_is_active();
 
-  $local_logfile->flush;
-  my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
-      . quotemeta($local_logfile->filename);
-  my $loglocator = `$cmd`;
-  die "system $cmd failed: $?" if $?;
-  chomp($loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
 
-  $local_logfile = undef;   # the temp file is automatically deleted
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
 }
 
 
@@ -1370,10 +1751,10 @@ sub freeze_if_want_freeze
       }
     }
     freeze();
-    collate_output();
+    create_output_collection();
     cleanup();
     save_meta();
-    exit 0;
+    exit 1;
   }
 }
 
@@ -1415,11 +1796,19 @@ sub srun
   my $opts = shift || {};
   my $stdin = shift;
   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
-  print STDERR (join (" ",
-                     map { / / ? "'$_'" : $_ }
-                     (@$args)),
-               "\n")
-      if $ENV{CRUNCH_DEBUG};
+
+  $Data::Dumper::Terse = 1;
+  $Data::Dumper::Indent = 0;
+  my $show_cmd = Dumper($args);
+  $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
+  $show_cmd =~ s/\n/ /g;
+  if ($opts->{fork}) {
+    Log(undef, "starting: $show_cmd");
+  } else {
+    # This is a child process: parent is in charge of reading our
+    # stderr and copying it to Log() if needed.
+    warn "starting: $show_cmd\n";
+  }
 
   if (defined $stdin) {
     my $child = open STDIN, "-|";
@@ -1461,104 +1850,420 @@ sub find_docker_image {
   # If so, return its stream name and Docker hash.
   # If not, return undef for both values.
   my $locator = shift;
-  if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
-    my @file_list = @{$image->{files}};
-    if ((scalar(@file_list) == 1) &&
-        ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
-      return ($file_list[0][0], $1);
+  my ($streamname, $filename);
+  my $image = api_call("collections/get", uuid => $locator);
+  if ($image) {
+    foreach my $line (split(/\n/, $image->{manifest_text})) {
+      my @tokens = split(/\s+/, $line);
+      next if (!@tokens);
+      $streamname = shift(@tokens);
+      foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
+        if (defined($filename)) {
+          return (undef, undef);  # More than one file in the Collection.
+        } else {
+          $filename = (split(/:/, $filedata, 3))[2];
+        }
+      }
+    }
+  }
+  if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
+    return ($streamname, $1);
+  } else {
+    return (undef, undef);
+  }
+}
+
+sub retry_count {
+  # Calculate the number of times an operation should be retried,
+  # assuming exponential backoff, and that we're willing to retry as
+  # long as tasks have been running.  Enforce a minimum of 3 retries.
+  my ($starttime, $endtime, $timediff, $retries);
+  if (@jobstep) {
+    $starttime = $jobstep[0]->{starttime};
+    $endtime = $jobstep[-1]->{finishtime};
+  }
+  if (!defined($starttime)) {
+    $timediff = 0;
+  } elsif (!defined($endtime)) {
+    $timediff = time - $starttime;
+  } else {
+    $timediff = ($endtime - $starttime) - (time - $endtime);
+  }
+  if ($timediff > 0) {
+    $retries = int(log($timediff) / log(2));
+  } else {
+    $retries = 1;  # Use the minimum.
+  }
+  return ($retries > 3) ? $retries : 3;
+}
+
+sub retry_op {
+  # Pass in two function references.
+  # This method will be called with the remaining arguments.
+  # If it dies, retry it with exponential backoff until it succeeds,
+  # or until the current retry_count is exhausted.  After each failure
+  # that can be retried, the second function will be called with
+  # the current try count (0-based), next try time, and error message.
+  my $operation = shift;
+  my $retry_callback = shift;
+  my $retries = retry_count();
+  foreach my $try_count (0..$retries) {
+    my $next_try = time + (2 ** $try_count);
+    my $result = eval { $operation->(@_); };
+    if (!$@) {
+      return $result;
+    } elsif ($try_count < $retries) {
+      $retry_callback->($try_count, $next_try, $@);
+      my $sleep_time = $next_try - time;
+      sleep($sleep_time) if ($sleep_time > 0);
+    }
+  }
+  # Ensure the error message ends in a newline, so Perl doesn't add
+  # retry_op's line number to it.
+  chomp($@);
+  die($@ . "\n");
+}
+
+sub api_call {
+  # Pass in a /-separated API method name, and arguments for it.
+  # This function will call that method, retrying as needed until
+  # the current retry_count is exhausted, with a log on the first failure.
+  my $method_name = shift;
+  my $log_api_retry = sub {
+    my ($try_count, $next_try_at, $errmsg) = @_;
+    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+    $errmsg =~ s/\s/ /g;
+    $errmsg =~ s/\s+$//;
+    my $retry_msg;
+    if ($next_try_at < time) {
+      $retry_msg = "Retrying.";
+    } else {
+      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+      $retry_msg = "Retrying at $next_try_fmt.";
+    }
+    Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
+  };
+  my $method = $arv;
+  foreach my $key (split(/\//, $method_name)) {
+    $method = $method->{$key};
+  }
+  return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+}
+
+sub exit_status_s {
+  # Given a $?, return a human-readable exit code string like "0" or
+  # "1" or "0 with signal 1" or "1 with signal 11".
+  my $exitcode = shift;
+  my $s = $exitcode >> 8;
+  if ($exitcode & 0x7f) {
+    $s .= " with signal " . ($exitcode & 0x7f);
+  }
+  if ($exitcode & 0x80) {
+    $s .= " with core dump";
+  }
+  return $s;
+}
+
+sub handle_readall {
+  # Pass in a glob reference to a file handle.
+  # Read all its contents and return them as a string.
+  my $fh_glob_ref = shift;
+  local $/ = undef;
+  return <$fh_glob_ref>;
+}
+
+sub tar_filename_n {
+  my $n = shift;
+  return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+  # Pass in a git archive command as a string or list, a la system().
+  # This method will save its output to be included in the archive sent to the
+  # build script.
+  my $git_input;
+  $git_tar_count++;
+  if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+    croak("Failed to save git archive: $!");
+  }
+  my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+  close($git_input);
+  waitpid($git_pid, 0);
+  close(GIT_ARCHIVE);
+  if ($?) {
+    croak("Failed to save git archive: git exited " . exit_status_s($?));
+  }
+}
+
+sub combined_git_archive {
+  # Combine all saved tar archives into a single archive, then return its
+  # contents in a string.  Return undef if no archives have been saved.
+  if ($git_tar_count < 1) {
+    return undef;
+  }
+  my $base_tar_name = tar_filename_n(1);
+  foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+    my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+    if ($tar_exit != 0) {
+      croak("Error preparing build archive: tar -A exited " .
+            exit_status_s($tar_exit));
     }
   }
-  return (undef, undef);
+  if (!open(GIT_TAR, "<", $base_tar_name)) {
+    croak("Could not open build archive: $!");
+  }
+  my $tar_contents = handle_readall(\*GIT_TAR);
+  close(GIT_TAR);
+  return $tar_contents;
+}
+
+sub set_nonblocking {
+  my $fh = shift;
+  my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+  fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
 }
 
 __DATA__
 #!/usr/bin/perl
-
-# checkout-and-build
+#
+# This is crunch-job's internal dispatch script.  crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally.  It gets called in two modes:
+#
+# * No arguments: Installation mode.  Read a tar archive from the DATA
+#   file handle; it includes the Crunch script's source code, and
+#   maybe SDKs as well.  Those should be installed in the proper
+#   locations.  This runs outside of any Docker container, so don't try to
+#   introspect Crunch's runtime environment.
+#
+# * With arguments: Crunch script run mode.  This script should set up the
+#   environment, then run the command specified in the arguments.  This runs
+#   inside any Docker container.
 
 use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+use constant TASK_TEMPFAIL => 111;
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
 
 my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
 my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
+my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
-for my $dir ($destdir, $task_work) {
-    if ($dir) {
-        make_path $dir;
-        -e $dir or die "Failed to create temporary directory ($dir): $!";
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
+for my $dir ($destdir, $job_work, $task_work) {
+  if ($dir) {
+    make_path $dir;
+    -e $dir or die "Failed to create temporary directory ($dir): $!";
+  }
+}
+
+if ($task_work) {
+  remove_tree($task_work, {keep_root => 1});
+}
+
+### Crunch script run mode
+if (@ARGV) {
+  # We want to do routine logging during task 0 only.  This gives the user
+  # the information they need, but avoids repeating the information for every
+  # task.
+  my $Log;
+  if ($ENV{TASK_SEQUENCE} eq "0") {
+    $Log = sub {
+      my $msg = shift;
+      printf STDERR_ORIG "[Crunch] $msg\n", @_;
+    };
+  } else {
+    $Log = sub { };
+  }
+
+  my $python_src = "$install_dir/python";
+  my $venv_dir = "$job_work/.arvados.venv";
+  my $venv_built = -e "$venv_dir/bin/activate";
+  if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
+    shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
+                 "--python=python2.7", $venv_dir);
+    shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+    $venv_built = 1;
+    $Log->("Built Python SDK virtualenv");
+  }
+
+  my $pip_bin = "pip";
+  if ($venv_built) {
+    $Log->("Running in Python SDK virtualenv");
+    $pip_bin = "$venv_dir/bin/pip";
+    my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+    @ARGV = ("/bin/sh", "-ec",
+             ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
+  } elsif (-d $python_src) {
+    $Log->("Warning: virtualenv not found inside Docker container default " .
+           "\$PATH. Can't install Python SDK.");
+  }
+
+  my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
+  if ($pkgs) {
+    $Log->("Using Arvados SDK:");
+    foreach my $line (split /\n/, $pkgs) {
+      $Log->($line);
+    }
+  } else {
+    $Log->("Arvados SDK packages not found");
+  }
+
+  while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+    my $sdk_path = "$install_dir/$sdk_dir";
+    if (-d $sdk_path) {
+      if ($ENV{$sdk_envkey}) {
+        $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+      } else {
+        $ENV{$sdk_envkey} = $sdk_path;
+      }
+      $Log->("Arvados SDK added to %s", $sdk_envkey);
     }
+  }
+
+  exec(@ARGV);
+  die "Cannot exec `@ARGV`: $!";
 }
 
+### Installation mode
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
-    if (@ARGV) {
-        exec(@ARGV);
-        die "Cannot exec `@ARGV`: $!";
-    } else {
-        exit 0;
-    }
-}
+if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
+  # This exact git archive (source + arvados sdk) is already installed
+  # here, so there's no need to reinstall it.
+
+  # We must consume our DATA section, though: otherwise the process
+  # feeding it to us will get SIGPIPE.
+  my $buf;
+  while (read(DATA, $buf, 65536)) { }
 
-unlink "$destdir.commit";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
+  exit(0);
+}
 
+unlink "$destdir.archive_hash";
 mkdir $destdir;
-my @git_archive_data = <DATA>;
-if (@git_archive_data) {
-  open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-  print TARX @git_archive_data;
+
+do {
+  # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+  local $SIG{PIPE} = "IGNORE";
+  warn "Extracting archive: $archive_hash\n";
+  # --ignore-zeros is necessary sometimes: depending on how much NUL
+  # padding tar -A put on our combined archive (which in turn depends
+  # on the length of the component archives) tar without
+  # --ignore-zeros will exit before consuming stdin and cause close()
+  # to fail on the resulting SIGPIPE.
+  if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
+    die "Error launching 'tar -xC $destdir': $!";
+  }
+  # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+  # get SIGPIPE.  We must feed it data incrementally.
+  my $tar_input;
+  while (read(DATA, $tar_input, 65536)) {
+    print TARX $tar_input;
+  }
   if(!close(TARX)) {
-    die "'tar -C $destdir -xf -' exited $?: $!";
+    die "'tar -xC $destdir' exited $?: $!";
   }
-}
+};
 
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
 
-for my $src_path ("$destdir/arvados/sdk/python") {
-  if (-d $src_path) {
-    shell_or_die ("virtualenv", $install_dir);
-    shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+  foreach my $sdk_lang (("python",
+                         map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
+    if (-d "$sdk_root/$sdk_lang") {
+      if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+        die "Failed to install $sdk_lang SDK: $!";
+      }
+    }
+  }
+}
+
+my $python_dir = "$install_dir/python";
+if ((-d $python_dir) and can_run("python2.7")) {
+  open(my $egg_info_pipe, "-|",
+       "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
+  my @egg_info_errors = <$egg_info_pipe>;
+  close($egg_info_pipe);
+  if ($?) {
+    if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
+      # egg_info apparently failed because it couldn't ask git for a build tag.
+      # Specify no build tag.
+      open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+      print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+      close($pysdk_cfg);
+    } else {
+      my $egg_info_exit = $? >> 8;
+      foreach my $errline (@egg_info_errors) {
+        print STDERR_ORIG $errline;
+      }
+      warn "python setup.py egg_info failed: exit $egg_info_exit";
+      exit ($egg_info_exit || 1);
+    }
   }
 }
 
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
 if (-e "$destdir/crunch_scripts/install") {
-    shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+    shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
     # Old version
-    shell_or_die ("./tests/autotests.sh", $install_dir);
+    shell_or_die (undef, "./tests/autotests.sh", $install_dir);
 } elsif (-e "./install.sh") {
-    shell_or_die ("./install.sh", $install_dir);
+    shell_or_die (undef, "./install.sh", $install_dir);
 }
 
-if ($commit) {
-    unlink "$destdir.commit.new";
-    symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
-    rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+if ($archive_hash) {
+    unlink "$destdir.archive_hash.new";
+    symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
+    rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
 }
 
 close L;
 
-if (@ARGV) {
-    exec(@ARGV);
-    die "Cannot exec `@ARGV`: $!";
-} else {
-    exit 0;
+sub can_run {
+  my $command_name = shift;
+  open(my $which, "-|", "which", $command_name);
+  while (<$which>) { }
+  close($which);
+  return ($? == 0);
 }
 
 sub shell_or_die
 {
+  my $exitcode = shift;
+
   if ($ENV{"DEBUG"}) {
     print STDERR "@_\n";
   }
-  system (@_) == 0
-      or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+  if (system (@_) != 0) {
+    my $err = $!;
+    my $code = $?;
+    my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
+    open STDERR, ">&STDERR_ORIG";
+    system ("cat $destdir.log >&2");
+    warn "@_ failed ($err): $exitstatus";
+    if (defined($exitcode)) {
+      exit $exitcode;
+    }
+    else {
+      exit (($code >> 8) || 1);
+    }
+  }
 }
 
 __DATA__