Merge branch '3775-fetch-git-repo' closes #3775
[arvados.git] / sdk / cli / bin / crunch-job
index 70f379e53fd9cc307bd933bc1b21276097863e4a..369bc3e1ae6f021fbd809ecc7fa82c4da2a77ccc 100755 (executable)
@@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output.
 Obtain job details from Arvados, run tasks on compute nodes (typically
 invoked by scheduler on controller):
 
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
 
 Obtain job details from command line, run tasks on local machine
 (typically invoked by application or developer on VM):
 
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
 
 =head1 OPTIONS
 
@@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway.
 
 =item --git-dir
 
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
 
 =item --job-api-token
 
@@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job
 setup. This can speed up development and debugging when running jobs
 locally.
 
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -83,7 +92,7 @@ use IPC::Open2;
 use IO::Select;
 use File::Temp;
 use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
 
 use constant EX_TEMPFAIL => 75;
 
@@ -125,8 +134,7 @@ if (defined $job_api_token) {
 }
 
 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
-my $local_job = !$job_has_uuid;
+my $local_job = 0;
 
 
 $SIG{'USR1'} = sub
@@ -141,38 +149,31 @@ $SIG{'USR2'} = sub
 
 
 my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
 
-my $User = $arv->{'users'}->{'current'}->execute;
-
-my $Job = {};
+my $Job;
 my $job_id;
 my $dbh;
 my $sth;
-if ($job_has_uuid)
+my @jobstep;
+
+my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
+
+if ($jobspec =~ /^[-a-z\d]+$/)
 {
-  $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+  # $jobspec is an Arvados UUID, not a JSON job specification
+  $Job = retry_op(sub {
+    $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+  });
   if (!$force_unlock) {
-    # If some other crunch-job process has grabbed this job (or we see
-    # other evidence that the job is already underway) we exit
-    # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
-    # mark the job as failed.
-    if ($Job->{'is_locked_by_uuid'}) {
-      Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
+    # Claim this job, and make sure nobody else does
+    eval { retry_op(sub {
+      # lock() sets is_locked_by_uuid and changes state to Running.
+      $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
+    }); };
+    if ($@) {
+      Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
       exit EX_TEMPFAIL;
-    }
-    if ($Job->{'success'} ne undef) {
-      Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'running'}) {
-      Log(undef, "Job 'running' flag is already set");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'started_at'}) {
-      Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
-      exit EX_TEMPFAIL;
-    }
+    };
   }
 }
 else
@@ -187,15 +188,14 @@ else
 
   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
   $Job->{'started_at'} = gmtime;
+  $Job->{'state'} = 'Running';
 
-  $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
-
-  $job_has_uuid = 1;
+  $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
 }
 $job_id = $Job->{'uuid'};
 
 my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -277,25 +277,11 @@ foreach (@sinfo)
 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
 
 
-
-my $jobmanager_id;
-if ($job_has_uuid)
-{
-  # Claim this job, and make sure nobody else does
-  unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
-          $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-    Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
-    exit EX_TEMPFAIL;
-  }
-  $Job->update_attributes('started_at' => scalar gmtime,
-                          'running' => 1,
-                          'success' => undef,
-                          'tasks_summary' => { 'failed' => 0,
-                                               'todo' => 1,
-                                               'running' => 0,
-                                               'done' => 0 });
-}
-
+$Job->update_attributes(
+  'tasks_summary' => { 'failed' => 0,
+                       'todo' => 1,
+                       'running' => 0,
+                       'done' => 0 });
 
 Log (undef, "start");
 $SIG{'INT'} = sub { $main::please_freeze = 1; };
@@ -317,7 +303,6 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id;
 $ENV{"JOB_UUID"} = $job_id;
 
 
-my @jobstep;
 my @jobstep_todo = ();
 my @jobstep_done = ();
 my @jobstep_tomerge = ();
@@ -335,12 +320,14 @@ if (defined $Job->{thawedfromkey})
 }
 else
 {
-  my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
-    'job_uuid' => $Job->{'uuid'},
-    'sequence' => 0,
-    'qsequence' => 0,
-    'parameters' => {},
-                                                          });
+  my $first_task = retry_op(sub {
+    $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+      'job_uuid' => $Job->{'uuid'},
+      'sequence' => 0,
+      'qsequence' => 0,
+      'parameters' => {},
+    });
+  });
   push @jobstep, { 'level' => 0,
                   'failures' => 0,
                    'arvados_task' => $first_task,
@@ -356,137 +343,209 @@ if (!$have_slurm)
 
 
 my $build_script;
+do {
+  local $/ = undef;
+  $build_script = <DATA>;
+};
+my $nodelist = join(",", @node);
 
+if (!defined $no_clear_tmp) {
+  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
+  Log (undef, "Clean work dirs");
 
-$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-
-my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
-if ($skip_install)
-{
-  if (!defined $no_clear_tmp) {
-    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
-    system($clear_tmp_cmd) == 0
-       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
-  }
-  $ENV{"CRUNCH_SRC"} = $Job->{script_version};
-  for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
-    if (-d $src_path) {
-      system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
-          or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
-      system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
-          == 0
-          or croak ("setup.py in $src_path failed: exit ".($?>>8));
-    }
+  my $cleanpid = fork();
+  if ($cleanpid == 0)
+  {
+    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+          ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+    exit (1);
+  }
+  while (1)
+  {
+    last if $cleanpid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($cleanpid);
+    select (undef, undef, undef, 0.1);
   }
+  Log (undef, "Cleanup command exited ".exit_status_s($?));
 }
-else
-{
-  do {
-    local $/ = undef;
-    $build_script = <DATA>;
-  };
-  Log (undef, "Install revision ".$Job->{script_version});
-  my $nodelist = join(",", @node);
 
-  if (!defined $no_clear_tmp) {
-    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
-    my $cleanpid = fork();
-    if ($cleanpid == 0)
-    {
-      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
-      exit (1);
-    }
-    while (1)
-    {
-      last if $cleanpid == waitpid (-1, WNOHANG);
-      freeze_if_want_freeze ($cleanpid);
-      select (undef, undef, undef, 0.1);
-    }
-    Log (undef, "Clean-work-dir exited $?");
-  }
-
-  # Install requested code version
+my $git_archive;
+if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
+  # If script_version looks like an absolute path, *and* the --git-dir
+  # argument was not given -- which implies we were not invoked by
+  # crunch-dispatch -- we will use the given path as a working
+  # directory instead of resolving script_version to a git commit (or
+  # doing anything else with git).
+  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
+  $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
+}
+else {
+  # Resolve the given script_version to a git commit sha1. Also, if
+  # the repository is remote, clone it into our local filesystem: this
+  # ensures "git archive" will work, and is necessary to reliably
+  # resolve a symbolic script_version like "master^".
+  $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
 
-  my @execargs;
-  my @srunargs = ("srun",
-                 "--nodelist=$nodelist",
-                 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+  Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-  $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
-
-  my $commit;
-  my $git_archive;
-  my $treeish = $Job->{'script_version'};
 
-  # If we're running under crunch-dispatch, it will have pulled the
-  # appropriate source tree into its own repository, and given us that
-  # repo's path as $git_dir. If we're running a "local" job, and a
-  # script_version was specified, it's up to the user to provide the
-  # full path to a local repository in Job->{repository}.
+  # If we're running under crunch-dispatch, it will have already
+  # pulled the appropriate source tree into its own repository, and
+  # given us that repo's path as $git_dir.
   #
-  # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
-  # git-archive --remote where appropriate.
+  # If we're running a "local" job, we might have to fetch content
+  # from a remote repository.
   #
-  # TODO: Accept a locally-hosted Arvados repository by name or
-  # UUID. Use arvados.v1.repositories.list or .get to figure out the
-  # appropriate fetch-url.
-  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
-
+  # (Currently crunch-dispatch gives a local path with --git-dir, but
+  # we might as well accept URLs there too in case it changes its
+  # mind.)
+  my $repo = $git_dir || $Job->{'repository'};
+
+  # Repository can be remote or local. If remote, we'll need to fetch it
+  # to a local dir before doing `git log` et al.
+  my $repo_location;
+
+  if ($repo =~ m{://|^[^/]*:}) {
+    # $repo is a git url we can clone, like git:// or https:// or
+    # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
+    # not recognized here because distinguishing that from a local
+    # path is too fragile. If you really need something strange here,
+    # use the ssh:// form.
+    $repo_location = 'remote';
+  } elsif ($repo =~ m{^\.*/}) {
+    # $repo is a local path to a git index. We'll also resolve ../foo
+    # to ../foo/.git if the latter is a directory. To help
+    # disambiguate local paths from named hosted repositories, this
+    # form must be given as ./ or ../ if it's a relative path.
+    if (-d "$repo/.git") {
+      $repo = "$repo/.git";
+    }
+    $repo_location = 'local';
+  } else {
+    # $repo is none of the above. It must be the name of a hosted
+    # repository.
+    my $arv_repo_list = retry_op(sub {
+      $arv->{'repositories'}->{'list'}->execute(
+        'filters' => [['name','=',$repo]]);
+    });
+    my @repos_found = @{$arv_repo_list->{'items'}};
+    my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
+    if ($n_found > 0) {
+      Log(undef, "Repository '$repo' -> "
+          . join(", ", map { $_->{'uuid'} } @repos_found));
+    }
+    if ($n_found != 1) {
+      croak("Error: Found $n_found repositories with name '$repo'.");
+    }
+    $repo = $repos_found[0]->{'fetch_url'};
+    $repo_location = 'remote';
+  }
+  Log(undef, "Using $repo_location repository '$repo'");
   $ENV{"CRUNCH_SRC_URL"} = $repo;
 
-  if (-d "$repo/.git") {
-    # We were given a working directory, but we are only interested in
-    # the index.
-    $repo = "$repo/.git";
-  }
+  # Resolve given script_version (we'll call that $treeish here) to a
+  # commit sha1 ($commit).
+  my $treeish = $Job->{'script_version'};
+  my $commit;
+  if ($repo_location eq 'remote') {
+    # We minimize excess object-fetching by re-using the same bare
+    # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
+    # just keep adding remotes to it as needed.
+    my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
+    my $gitcmd = "git --git-dir=\Q$local_repo\E";
+
+    # Set up our local repo for caching remote objects, making
+    # archives, etc.
+    if (!-d $local_repo) {
+      make_path($local_repo) or croak("Error: could not create $local_repo");
+    }
+    # This works (exits 0 and doesn't delete fetched objects) even
+    # if $local_repo is already initialized:
+    `$gitcmd init --bare`;
+    if ($?) {
+      croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
+    }
+
+    # If $treeish looks like a hash (or abbrev hash) we look it up in
+    # our local cache first, since that's cheaper. (We don't want to
+    # do that with tags/branches though -- those change over time, so
+    # they should always be resolved by the remote repo.)
+    if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
+      # Hide stderr because it's normal for this to fail:
+      my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
+      if ($? == 0 &&
+          # Careful not to resolve a branch named abcdeff to commit 1234567:
+          $sha1 =~ /^$treeish/ &&
+          $sha1 =~ /^([0-9a-f]{40})$/s) {
+        $commit = $1;
+        Log(undef, "Commit $commit already present in $local_repo");
+      }
+    }
 
-  # If this looks like a subversion r#, look for it in git-svn commit messages
+    if (!defined $commit) {
+      # If $treeish isn't just a hash or abbrev hash, or isn't here
+      # yet, we need to fetch the remote to resolve it correctly.
 
-  if ($treeish =~ m{^\d{1,4}$}) {
-    my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
-    chomp $gitlog;
-    Log(undef, "git Subversion search exited $?");
-    if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
-      $commit = $gitlog;
-      Log(undef, "Using commit $commit for Subversion revision $treeish");
+      # First, remove all local heads. This prevents a name that does
+      # not exist on the remote from resolving to (or colliding with)
+      # a previously fetched branch or tag (possibly from a different
+      # remote).
+      remove_tree("$local_repo/refs/heads", {keep_root => 1});
+
+      Log(undef, "Fetching objects from $repo to $local_repo");
+      `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
+      if ($?) {
+        croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
+      }
     }
+
+    # Now that the data is all here, we will use our local repo for
+    # the rest of our git activities.
+    $repo = $local_repo;
   }
 
-  # If that didn't work, try asking git to look it up as a tree-ish.
-
-  if (!defined $commit) {
-    my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
-    chomp $found;
-    Log(undef, "git rev-list exited $? with result '$found'");
-    if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
-      $commit = $found;
-      Log(undef, "Using commit $commit for tree-ish $treeish");
-      if ($commit ne $treeish) {
-       # Make sure we record the real commit id in the database,
-       # frozentokey, logs, etc. -- instead of an abbreviation or a
-       # branch name which can become ambiguous or point to a
-       # different commit in the future.
-        $Job->{'script_version'} = $commit;
-        !$job_has_uuid or
-            $Job->update_attributes('script_version' => $commit) or
-            croak("Error while updating job");
-      }
+  my $gitcmd = "git --git-dir=\Q$repo\E";
+  my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
+  unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
+    croak("`$gitcmd rev-list` exited "
+          .exit_status_s($?)
+          .", '$treeish' not found. Giving up.");
+  }
+  $commit = $1;
+  Log(undef, "Version $treeish is commit $commit");
+
+  if ($commit ne $Job->{'script_version'}) {
+    # Record the real commit id in the database, frozentokey, logs,
+    # etc. -- instead of an abbreviation or a branch name which can
+    # become ambiguous or point to a different commit in the future.
+    if (!$Job->update_attributes('script_version' => $commit)) {
+      croak("Error: failed to update job's script_version attribute");
     }
   }
 
-  if (defined $commit) {
-    $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-    @execargs = ("sh", "-c",
-                "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
-    croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
+  $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
+  $git_archive = `$gitcmd archive ''\Q$commit\E`;
+  if ($?) {
+    croak("Error: $gitcmd archive exited ".exit_status_s($?));
   }
-  else {
-    croak ("could not figure out commit id for $treeish");
+}
+
+if (!defined $git_archive) {
+  Log(undef, "Skip install phase (no git archive)");
+  if ($have_slurm) {
+    Log(undef, "Warning: This probably means workers have no source tree!");
   }
+}
+else {
+  Log(undef, "Run install script on all workers");
+
+  my @srunargs = ("srun",
+                  "--nodelist=$nodelist",
+                  "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+  my @execargs = ("sh", "-c",
+                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
 
   # Note: this section is almost certainly unnecessary if we're
   # running tasks in docker containers.
@@ -502,7 +561,7 @@ else
     freeze_if_want_freeze ($installpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Install exited $?");
+  Log (undef, "Install script exited ".exit_status_s($?));
 }
 
 if (!$have_slurm)
@@ -541,7 +600,8 @@ fi
   }
   if ($? != 0)
   {
-    croak("Installing Docker image from $docker_locator returned exit code $?");
+    croak("Installing Docker image from $docker_locator exited "
+          .exit_status_s($?));
   }
 }
 
@@ -859,12 +919,13 @@ else {
     while (my $manifest_line = <$orig_manifest>) {
       $orig_manifest_text .= $manifest_line;
     }
-    my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
-      'manifest_text' => $orig_manifest_text,
+    my $output = retry_op(sub {
+      $arv->{'collections'}->{'create'}->execute(
+        'collection' => {'manifest_text' => $orig_manifest_text});
     });
     Log(undef, "output uuid " . $output->{uuid});
     Log(undef, "output hash " . $output->{portable_data_hash});
-    $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
+    $Job->update_attributes('output' => $output->{portable_data_hash});
   };
   if ($@) {
     Log (undef, "Failed to register output manifest: $@");
@@ -875,13 +936,15 @@ Log (undef, "finish");
 
 save_meta();
 
-if ($job_has_uuid) {
-  $Job->update_attributes('running' => 0,
-                          'success' => $collated_output && $main::success,
-                          'finished_at' => scalar gmtime)
+my $final_state;
+if ($collated_output && $main::success) {
+  $final_state = 'Complete';
+} else {
+  $final_state = 'Failed';
 }
+$Job->update_attributes('state' => $final_state);
 
-exit ($Job->{'success'} ? 1 : 0);
+exit (($final_state eq 'Complete') ? 0 : 1);
 
 
 
@@ -896,9 +959,7 @@ sub update_progress_stats
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
-  if ($job_has_uuid) {
-    $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
-  }
+  $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
@@ -919,10 +980,7 @@ sub reapchildren
 
   my $childstatus = $?;
   my $exitvalue = $childstatus >> 8;
-  my $exitinfo = sprintf("exit %d signal %d%s",
-                         $exitvalue,
-                         $childstatus & 127,
-                         ($childstatus & 128 ? ' core dump' : ''));
+  my $exitinfo = "exit ".exit_status_s($childstatus);
   $Jobstep->{'arvados_task'}->reload;
   my $task_success = $Jobstep->{'arvados_task'}->{success};
 
@@ -968,10 +1026,8 @@ sub reapchildren
       $main::success = 0;
       $main::please_freeze = 1;
     }
-    else {
-      # Put this task back on the todo queue
-      push @jobstep_todo, $jobstepid;
-    }
+    # Put this task back on the todo queue
+    push @jobstep_todo, $jobstepid;
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
@@ -1000,13 +1056,15 @@ sub reapchildren
     my $newtask_list = [];
     my $newtask_results;
     do {
-      $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
-        'where' => {
-          'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-        },
-        'order' => 'qsequence',
-        'offset' => scalar(@$newtask_list),
-      );
+      $newtask_results = retry_op(sub {
+        $arv->{'job_tasks'}->{'list'}->execute(
+          'where' => {
+            'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+          },
+          'order' => 'qsequence',
+          'offset' => scalar(@$newtask_list),
+        );
+      });
       push(@$newtask_list, @{$newtask_results->{items}});
     } while (@{$newtask_results->{items}});
     foreach my $arvados_task (@$newtask_list) {
@@ -1029,19 +1087,23 @@ sub check_refresh_wanted
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
   if (@stat && $stat[9] > $latest_refresh) {
     $latest_refresh = scalar time;
-    if ($job_has_uuid) {
-      my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
-      for my $attr ('cancelled_at',
-                    'cancelled_by_user_uuid',
-                    'cancelled_by_client_uuid') {
-        $Job->{$attr} = $Job2->{$attr};
-      }
-      if ($Job->{'cancelled_at'}) {
-        Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
-             " by user " . $Job->{cancelled_by_user_uuid});
-        $main::success = 0;
-        $main::please_freeze = 1;
+    my $Job2 = retry_op(sub {
+      $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+    });
+    for my $attr ('cancelled_at',
+                  'cancelled_by_user_uuid',
+                  'cancelled_by_client_uuid',
+                  'state') {
+      $Job->{$attr} = $Job2->{$attr};
+    }
+    if ($Job->{'state'} ne "Running") {
+      if ($Job->{'state'} eq "Cancelled") {
+        Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
+      } else {
+        Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
       }
+      $main::success = 0;
+      $main::please_freeze = 1;
     }
   }
 }
@@ -1208,7 +1270,7 @@ sub collate_output
 
   my ($child_out, $child_in);
   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
-                  '--retries', put_retry_count());
+                  '--retries', retry_count());
   my $joboutput;
   for (@jobstep)
   {
@@ -1243,10 +1305,13 @@ sub collate_output
     if ($s->can_read(120)) {
       sysread($child_out, $joboutput, 64 * 1024 * 1024);
       chomp($joboutput);
+      # TODO: Ensure exit status == 0.
     } else {
       Log (undef, "timed out reading from 'arv-put'");
     }
   }
+  # TODO: kill $pid instead of waiting, now that we've decided to
+  # ignore further output.
   waitpid($pid, 0);
 
   return $joboutput;
@@ -1294,6 +1359,73 @@ sub fhbits
 }
 
 
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+#     Starts an arv-put pipe, reading data on stdin and writing it to
+#     a $logfilename file in an output collection.
+#
+# log_writer_send($txt)
+#     Writes $txt to the output log collection.
+#
+# log_writer_finish()
+#     Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+#     Returns a true value if there is currently a live arv-put
+#     process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+
+sub log_writer_start($)
+{
+  my $logfilename = shift;
+  $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+                        'arv-put', '--portable-data-hash',
+                        '--retries', '3',
+                        '--filename', $logfilename,
+                        '-');
+}
+
+sub log_writer_send($)
+{
+  my $txt = shift;
+  print $log_pipe_in $txt;
+}
+
+sub log_writer_finish()
+{
+  return unless $log_pipe_pid;
+
+  close($log_pipe_in);
+  my $arv_put_output;
+
+  my $s = IO::Select->new($log_pipe_out);
+  if ($s->can_read(120)) {
+    sysread($log_pipe_out, $arv_put_output, 1024);
+    chomp($arv_put_output);
+  } else {
+    Log (undef, "timed out reading from 'arv-put'");
+  }
+
+  waitpid($log_pipe_pid, 0);
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
+  if ($?) {
+    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+  }
+
+  return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+  return $log_pipe_pid;
+}
+
 sub Log                                # ($jobstep_id, $logmessage)
 {
   if ($_[1] =~ /\n/) {
@@ -1307,15 +1439,15 @@ sub Log                         # ($jobstep_id, $logmessage)
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
-  if ($local_logfile || -t STDERR) {
+  if (log_writer_is_active() || -t STDERR) {
     my @gmtime = gmtime;
     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
                         $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  if ($local_logfile) {
-    print $local_logfile $datetime . " " . $message;
+  if (log_writer_is_active()) {
+    log_writer_send($datetime . " " . $message);
   }
 }
 
@@ -1328,17 +1460,19 @@ sub croak
   freeze() if @jobstep_todo;
   collate_output() if @jobstep_todo;
   cleanup();
-  save_meta() if $local_logfile;
+  save_meta();
   die;
 }
 
 
 sub cleanup
 {
-  return if !$job_has_uuid;
-  $Job->update_attributes('running' => 0,
-                          'success' => 0,
-                          'finished_at' => scalar gmtime);
+  return unless $Job;
+  if ($Job->{'state'} eq 'Cancelled') {
+    $Job->update_attributes('finished_at' => scalar gmtime);
+  } else {
+    $Job->update_attributes('state' => 'Failed');
+  }
 }
 
 
@@ -1346,19 +1480,12 @@ sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+  return unless log_writer_is_active();
 
-  $local_logfile->flush;
-  my $retry_count = put_retry_count();
-  my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
-      "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
-  my $loglocator = `$cmd`;
-  die "system $cmd failed: $?" if $?;
-  chomp($loglocator);
-
-  $local_logfile = undef;   # the temp file is automatically deleted
+  my $loglocator = log_writer_finish();
   Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
+  $Job->update_attributes('log', $loglocator);
 }
 
 
@@ -1386,7 +1513,7 @@ sub freeze_if_want_freeze
     collate_output();
     cleanup();
     save_meta();
-    exit 0;
+    exit 1;
   }
 }
 
@@ -1475,7 +1602,10 @@ sub find_docker_image {
   # If not, return undef for both values.
   my $locator = shift;
   my ($streamname, $filename);
-  if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
+  my $image = retry_op(sub {
+    $arv->{collections}->{get}->execute(uuid => $locator);
+  });
+  if ($image) {
     foreach my $line (split(/\n/, $image->{manifest_text})) {
       my @tokens = split(/\s+/, $line);
       next if (!@tokens);
@@ -1496,20 +1626,66 @@ sub find_docker_image {
   }
 }
 
-sub put_retry_count {
-  # Calculate a --retries argument for arv-put that will have it try
-  # approximately as long as this Job has been running.
-  my $stoptime = shift || time;
-  my $starttime = $jobstep[0]->{starttime};
-  my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
-  my $retries = 0;
-  while ($timediff >= 2) {
-    $retries++;
-    $timediff /= 2;
+sub retry_count {
+  # Calculate the number of times an operation should be retried,
+  # assuming exponential backoff, and that we're willing to retry as
+  # long as tasks have been running.  Enforce a minimum of 3 retries.
+  my ($starttime, $endtime, $timediff, $retries);
+  if (@jobstep) {
+    $starttime = $jobstep[0]->{starttime};
+    $endtime = $jobstep[-1]->{finishtime};
+  }
+  if (!defined($starttime)) {
+    $timediff = 0;
+  } elsif (!defined($endtime)) {
+    $timediff = time - $starttime;
+  } else {
+    $timediff = ($endtime - $starttime) - (time - $endtime);
+  }
+  if ($timediff > 0) {
+    $retries = int(log($timediff) / log(2));
+  } else {
+    $retries = 1;  # Use the minimum.
   }
   return ($retries > 3) ? $retries : 3;
 }
 
+sub retry_op {
+  # Given a function reference, call it with the remaining arguments.  If
+  # it dies, retry it with exponential backoff until it succeeds, or until
+  # the current retry_count is exhausted.
+  my $operation = shift;
+  my $retries = retry_count();
+  foreach my $try_count (0..$retries) {
+    my $next_try = time + (2 ** $try_count);
+    my $result = eval { $operation->(@_); };
+    if (!$@) {
+      return $result;
+    } elsif ($try_count < $retries) {
+      my $sleep_time = $next_try - time;
+      sleep($sleep_time) if ($sleep_time > 0);
+    }
+  }
+  # Ensure the error message ends in a newline, so Perl doesn't add
+  # retry_op's line number to it.
+  chomp($@);
+  die($@ . "\n");
+}
+
+sub exit_status_s {
+  # Given a $?, return a human-readable exit code string like "0" or
+  # "1" or "0 with signal 1" or "1 with signal 11".
+  my $exitcode = shift;
+  my $s = $exitcode >> 8;
+  if ($exitcode & 0x7f) {
+    $s .= " with signal " . ($exitcode & 0x7f);
+  }
+  if ($exitcode & 0x80) {
+    $s .= " with core dump";
+  }
+  return $s;
+}
+
 __DATA__
 #!/usr/bin/perl