Use a simple 'open' for fetch_block (freeing up an extra filehandle). refs #2325...
[arvados.git] / sdk / cli / bin / crunch-job
index e23492aba4b3766ae258447ab359616e14548f47..2f8623ba66490f4cf930714473b43a05b4e3215f 100755 (executable)
@@ -33,6 +33,12 @@ Path to .git directory where the specified commit is found.
 
 Arvados API authorization token to use during the course of the job.
 
 
 Arvados API authorization token to use during the course of the job.
 
+=item --no-clear-tmp
+
+Do not clear per-job/task temporary directories during initial job
+setup. This can speed up development and debugging when running jobs
+locally.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -58,7 +64,8 @@ Save a checkpoint and continue.
 =item SIGHUP
 
 Refresh node allocation (i.e., check whether any nodes have been added
 =item SIGHUP
 
 Refresh node allocation (i.e., check whether any nodes have been added
-or unallocated). Currently this is a no-op.
+or unallocated) and attributes of the Job record that should affect
+behavior (e.g., cancel job if cancelled_at becomes non-nil).
 
 =back
 
 
 =back
 
@@ -70,17 +77,21 @@ use POSIX ':sys_wait_h';
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
-use Warehouse;
-use Warehouse::Stream;
-use IPC::System::Simple qw(capturex);
+use IPC::Open2;
+use IO::Select;
+use File::Temp;
+use Fcntl ':flock';
 
 $ENV{"TMPDIR"} ||= "/tmp";
 
 $ENV{"TMPDIR"} ||= "/tmp";
-$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
-if ($ENV{"USER"} ne "crunch" && $< != 0) {
-  # use a tmp dir unique for my uid
-  $ENV{"CRUNCH_TMP"} .= "-$<";
+unless (defined $ENV{"CRUNCH_TMP"}) {
+  $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
+  if ($ENV{"USER"} ne "crunch" && $< != 0) {
+    # use a tmp dir unique for my uid
+    $ENV{"CRUNCH_TMP"} .= "-$<";
+  }
 }
 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
 }
 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
+$ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
@@ -88,11 +99,13 @@ my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
+my $no_clear_tmp;
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
+           'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
     );
 
            'resume-stash=s' => \$resume_stash,
     );
 
@@ -105,10 +118,6 @@ my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
 my $local_job = !$job_has_uuid;
 
 
 my $local_job = !$job_has_uuid;
 
 
-$SIG{'HUP'} = sub
-{
-  1;
-};
 $SIG{'USR1'} = sub
 {
   $main::ENV{CRUNCH_DEBUG} = 1;
 $SIG{'USR1'} = sub
 {
   $main::ENV{CRUNCH_DEBUG} = 1;
@@ -120,10 +129,8 @@ $SIG{'USR2'} = sub
 
 
 
 
 
 
-my $arv = Arvados->new;
-my $metastream = Warehouse::Stream->new(whc => new Warehouse);
-$metastream->clear;
-$metastream->write_start('log.txt');
+my $arv = Arvados->new('apiVersion' => 'v1');
+my $metastream;
 
 my $User = $arv->{'users'}->{'current'}->execute;
 
 
 my $User = $arv->{'users'}->{'current'}->execute;
 
@@ -168,7 +175,8 @@ else
 }
 $job_id = $Job->{'uuid'};
 
 }
 $job_id = $Job->{'uuid'};
 
-
+my $keep_logfile = $job_id . '.log.txt';
+my $local_logfile = File::Temp->new();
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -255,20 +263,17 @@ my $jobmanager_id;
 if ($job_has_uuid)
 {
   # Claim this job, and make sure nobody else does
 if ($job_has_uuid)
 {
   # Claim this job, and make sure nobody else does
-
-  $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $Job->{'started_at'} = gmtime;
-  $Job->{'running'} = 1;
-  $Job->{'success'} = undef;
-  $Job->{'tasks_summary'} = { 'failed' => 0,
-                              'todo' => 1,
-                              'running' => 0,
-                              'done' => 0 };
-  if ($job_has_uuid) {
-    unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-      croak("Error while updating / locking job");
-    }
+  unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
+          $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
+    croak("Error while updating / locking job");
   }
   }
+  $Job->update_attributes('started_at' => scalar gmtime,
+                          'running' => 1,
+                          'success' => undef,
+                          'tasks_summary' => { 'failed' => 0,
+                                               'todo' => 1,
+                                               'running' => 0,
+                                               'done' => 0 });
 }
 
 
 }
 
 
@@ -279,9 +284,12 @@ $SIG{'TERM'} = \&croak;
 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
 $SIG{'ALRM'} = sub { $main::please_info = 1; };
 $SIG{'CONT'} = sub { $main::please_continue = 1; };
 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
 $SIG{'ALRM'} = sub { $main::please_info = 1; };
 $SIG{'CONT'} = sub { $main::please_continue = 1; };
+$SIG{'HUP'} = sub { $main::please_refresh = 1; };
+
 $main::please_freeze = 0;
 $main::please_info = 0;
 $main::please_continue = 0;
 $main::please_freeze = 0;
 $main::please_info = 0;
 $main::please_continue = 0;
+$main::please_refresh = 0;
 my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
 
 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
 my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
 
 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
@@ -297,6 +305,7 @@ my $jobstep_tomerge_level = 0;
 my $squeue_checked;
 my $squeue_kill_checked;
 my $output_in_keep = 0;
 my $squeue_checked;
 my $squeue_kill_checked;
 my $output_in_keep = 0;
+my $latest_refresh = scalar time;
 
 
 
 
 
 
@@ -313,13 +322,19 @@ else
     'parameters' => {},
                                                           });
   push @jobstep, { 'level' => 0,
     'parameters' => {},
                                                           });
   push @jobstep, { 'level' => 0,
-                  'attempts' => 0,
+                  'failures' => 0,
                    'arvados_task' => $first_task,
                 };
   push @jobstep_todo, 0;
 }
 
 
                    'arvados_task' => $first_task,
                 };
   push @jobstep_todo, 0;
 }
 
 
+if (!$have_slurm)
+{
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
+
 my $build_script;
 
 
 my $build_script;
 
 
@@ -328,7 +343,21 @@ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
+  if (!defined $no_clear_tmp) {
+    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
+    system($clear_tmp_cmd) == 0
+       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
+  }
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
+  for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
+    if (-d $src_path) {
+      system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
+          or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
+      system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
+          == 0
+          or croak ("setup.py in $src_path failed: exit ".($?>>8));
+    }
+  }
 }
 else
 {
 }
 else
 {
@@ -339,22 +368,24 @@ else
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
-  # Clean out crunch_tmp/work and crunch_tmp/opt
+  if (!defined $no_clear_tmp) {
+    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
 
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
+    my $cleanpid = fork();
+    if ($cleanpid == 0)
+    {
+      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+      exit (1);
+    }
+    while (1)
+    {
+      last if $cleanpid == waitpid (-1, WNOHANG);
+      freeze_if_want_freeze ($cleanpid);
+      select (undef, undef, undef, 0.1);
+    }
+    Log (undef, "Clean-work-dir exited $?");
   }
   }
-  Log (undef, "Clean-work-dir exited $?");
 
   # Install requested code version
 
 
   # Install requested code version
 
@@ -365,7 +396,6 @@ else
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
-  $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 
   my $commit;
   my $git_archive;
 
   my $commit;
   my $git_archive;
@@ -419,7 +449,9 @@ else
        Log (undef, "Using commit $commit for tree-ish $treeish");
         if ($commit ne $treeish) {
           $Job->{'script_version'} = $commit;
        Log (undef, "Using commit $commit for tree-ish $treeish");
         if ($commit ne $treeish) {
           $Job->{'script_version'} = $commit;
-          !$job_has_uuid or $Job->save() or croak("Error while updating job");
+          !$job_has_uuid or
+              $Job->update_attributes('script_version' => $commit) or
+              croak("Error while updating job");
         }
       }
     }
         }
       }
     }
@@ -450,6 +482,12 @@ else
   Log (undef, "Install exited $?");
 }
 
   Log (undef, "Install exited $?");
 }
 
+if (!$have_slurm)
+{
+  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -465,7 +503,7 @@ foreach (split (/\n/, $Job->{knobs}))
 
 
 
 
 
 
-my $success;
+$main::success = undef;
 
 
 
 
 
 
@@ -496,20 +534,12 @@ update_progress_stats();
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
-  $main::please_continue = 0;
-
   my $id = $jobstep_todo[$todo_ptr];
   my $Jobstep = $jobstep[$id];
   if ($Jobstep->{level} != $level)
   {
     next;
   }
   my $id = $jobstep_todo[$todo_ptr];
   my $Jobstep = $jobstep[$id];
   if ($Jobstep->{level} != $level)
   {
     next;
   }
-  if ($Jobstep->{attempts} > 9)
-  {
-    Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
-    $success = 0;
-    last THISROUND;
-  }
 
   pipe $reader{$id}, "writer" or croak ($!);
   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
 
   pipe $reader{$id}, "writer" or croak ($!);
   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
@@ -549,9 +579,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
+    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
     $ENV{"GZIP"} = "-n";
 
 
     $ENV{"GZIP"} = "-n";
 
@@ -564,7 +596,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my @execargs = qw(sh);
     my $build_script_to_send = "";
     my $command =
     my @execargs = qw(sh);
     my $build_script_to_send = "";
     my $command =
-       "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
+       "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
+        ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
        ."&& cd $ENV{CRUNCH_TMP} ";
     if ($build_script)
     {
        ."&& cd $ENV{CRUNCH_TMP} ";
     if ($build_script)
     {
@@ -572,14 +605,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .=
          "&& perl -";
     }
       $command .=
          "&& perl -";
     }
-    $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
-    $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
-    $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
     $command .=
     $command .=
-        "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+        "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     my @execargs = ('bash', '-c', $command);
     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
     my @execargs = ('bash', '-c', $command);
     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
-    exit (1);
+    exit (111);
   }
   close("writer");
   if (!defined $childpid)
   }
   close("writer");
   if (!defined $childpid)
@@ -599,7 +629,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
   Log ($id, "child $childpid started on $childslotname");
 
   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
   Log ($id, "child $childpid started on $childslotname");
-  $Jobstep->{attempts} ++;
   $Jobstep->{starttime} = time;
   $Jobstep->{node} = $childnode->{name};
   $Jobstep->{slotindex} = $childslot;
   $Jobstep->{starttime} = time;
   $Jobstep->{node} = $childnode->{name};
   $Jobstep->{slotindex} = $childslot;
@@ -629,6 +658,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
        + reapchildren ();
     if (!$gotsome)
     {
        + reapchildren ();
     if (!$gotsome)
     {
+      check_refresh_wanted();
       check_squeue();
       update_progress_stats();
       select (undef, undef, undef, 0.1);
       check_squeue();
       update_progress_stats();
       select (undef, undef, undef, 0.1);
@@ -677,11 +707,15 @@ map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
 while (%proc)
 {
 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
 while (%proc)
 {
-  goto THISROUND if $main::please_continue;
+  if ($main::please_continue) {
+    $main::please_continue = 0;
+    goto THISROUND;
+  }
   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
   readfrompipes ();
   if (!reapchildren())
   {
   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
   readfrompipes ();
   if (!reapchildren())
   {
+    check_refresh_wanted();
     check_squeue();
     update_progress_stats();
     select (undef, undef, undef, 0.1);
     check_squeue();
     update_progress_stats();
     select (undef, undef, undef, 0.1);
@@ -693,7 +727,7 @@ update_progress_stats();
 freeze_if_want_freeze();
 
 
 freeze_if_want_freeze();
 
 
-if (!defined $success)
+if (!defined $main::success)
 {
   if (@jobstep_todo &&
       $thisround_succeeded == 0 &&
 {
   if (@jobstep_todo &&
       $thisround_succeeded == 0 &&
@@ -701,30 +735,30 @@ if (!defined $success)
   {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
   {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
-    $success = 0;
+    $main::success = 0;
   }
   if (!@jobstep_todo)
   {
   }
   if (!@jobstep_todo)
   {
-    $success = 1;
+    $main::success = 1;
   }
 }
 
   }
 }
 
-goto ONELEVEL if !defined $success;
+goto ONELEVEL if !defined $main::success;
 
 
 release_allocation();
 freeze();
 
 
 release_allocation();
 freeze();
-$Job->reload;
-$Job->{'output'} = &collate_output();
-$Job->{'running'} = 0;
-$Job->{'success'} = $Job->{'output'} && $success;
-$Job->{'finished_at'} = gmtime;
-$Job->save if $job_has_uuid;
+if ($job_has_uuid) {
+  $Job->update_attributes('output' => &collate_output(),
+                          'running' => 0,
+                          'success' => $Job->{'output'} && $main::success,
+                          'finished_at' => scalar gmtime)
+}
 
 if ($Job->{'output'})
 {
   eval {
 
 if ($Job->{'output'})
 {
   eval {
-    my $manifest_text = capturex("whget", $Job->{'output'});
+    my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
@@ -753,7 +787,9 @@ sub update_progress_stats
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
-  $Job->save if $job_has_uuid;
+  if ($job_has_uuid) {
+    $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
+  }
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
@@ -772,27 +808,32 @@ sub reapchildren
   my $elapsed = time - $proc{$pid}->{time};
   my $Jobstep = $jobstep[$jobstepid];
 
   my $elapsed = time - $proc{$pid}->{time};
   my $Jobstep = $jobstep[$jobstepid];
 
-  my $exitcode = $?;
-  my $exitinfo = "exit $exitcode";
+  my $childstatus = $?;
+  my $exitvalue = $childstatus >> 8;
+  my $exitinfo = sprintf("exit %d signal %d%s",
+                         $exitvalue,
+                         $childstatus & 127,
+                         ($childstatus & 128 ? ' core dump' : ''));
   $Jobstep->{'arvados_task'}->reload;
   $Jobstep->{'arvados_task'}->reload;
-  my $success = $Jobstep->{'arvados_task'}->{success};
+  my $task_success = $Jobstep->{'arvados_task'}->{success};
 
 
-  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
+  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
 
 
-  if (!defined $success) {
+  if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
     # task did not indicate one way or the other --> fail
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
-    $success = 0;
+    $task_success = 0;
   }
 
   }
 
-  if (!$success)
+  if (!$task_success)
   {
   {
-    my $no_incr_attempts;
-    $no_incr_attempts = 1 if $Jobstep->{node_fail};
+    my $temporary_fail;
+    $temporary_fail ||= $Jobstep->{node_fail};
+    $temporary_fail ||= ($exitvalue == 111);
 
     ++$thisround_failed;
 
     ++$thisround_failed;
-    ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
+    ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
 
     # Check for signs of a failed or misconfigured node
     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
 
     # Check for signs of a failed or misconfigured node
     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
@@ -800,19 +841,28 @@ sub reapchildren
       # Don't count this against jobstep failure thresholds if this
       # node is already suspected faulty and srun exited quickly
       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
       # Don't count this against jobstep failure thresholds if this
       # node is already suspected faulty and srun exited quickly
       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5 &&
-         $Jobstep->{attempts} > 1) {
-       Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
-        $no_incr_attempts = 1;
-       --$Jobstep->{attempts};
+         $elapsed < 5) {
+       Log ($jobstepid, "blaming failure on suspect node " .
+             $slot[$proc{$pid}->{slot}]->{node}->{name});
+        $temporary_fail ||= 1;
       }
       ban_node_by_slot($proc{$pid}->{slot});
     }
 
       }
       ban_node_by_slot($proc{$pid}->{slot});
     }
 
-    push @jobstep_todo, $jobstepid;
-    Log ($jobstepid, "failure in $elapsed seconds");
+    Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
+                             ++$Jobstep->{'failures'},
+                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $elapsed));
 
 
-    --$Jobstep->{attempts} if $no_incr_attempts;
+    if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+      # Give up on this task, and the whole job
+      $main::success = 0;
+      $main::please_freeze = 1;
+    }
+    else {
+      # Put this task back on the todo queue
+      push @jobstep_todo, $jobstepid;
+    }
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
@@ -823,9 +873,9 @@ sub reapchildren
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
-  $Jobstep->{exitcode} = $exitcode;
+  $Jobstep->{exitcode} = $childstatus;
   $Jobstep->{finishtime} = time;
   $Jobstep->{finishtime} = time;
-  process_stderr ($jobstepid, $success);
+  process_stderr ($jobstepid, $task_success);
   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
 
   close $reader{$jobstepid};
   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
 
   close $reader{$jobstepid};
@@ -844,7 +894,7 @@ sub reapchildren
   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
     my $jobstep = {
       'level' => $arvados_task->{'sequence'},
   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
     my $jobstep = {
       'level' => $arvados_task->{'sequence'},
-      'attempts' => 0,
+      'failures' => 0,
       'arvados_task' => $arvados_task
     };
     push @jobstep, $jobstep;
       'arvados_task' => $arvados_task
     };
     push @jobstep, $jobstep;
@@ -855,6 +905,27 @@ sub reapchildren
   1;
 }
 
   1;
 }
 
+sub check_refresh_wanted
+{
+  my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
+  if (@stat && $stat[9] > $latest_refresh) {
+    $latest_refresh = scalar time;
+    if ($job_has_uuid) {
+      my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+      for my $attr ('cancelled_at',
+                    'cancelled_by_user_uuid',
+                    'cancelled_by_client_uuid') {
+        $Job->{$attr} = $Job2->{$attr};
+      }
+      if ($Job->{'cancelled_at'}) {
+        Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
+             " by user " . $Job->{cancelled_by_user_uuid});
+        $main::success = 0;
+        $main::please_freeze = 1;
+      }
+    }
+  }
+}
 
 sub check_squeue
 {
 
 sub check_squeue
 {
@@ -964,7 +1035,7 @@ sub preprocess_stderr
     my $line = $1;
     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
     Log ($job, "stderr $line");
     my $line = $1;
     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
     Log ($job, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
+    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
       # whoa.
       $main::please_freeze = 1;
     }
       # whoa.
       $main::please_freeze = 1;
     }
@@ -979,7 +1050,7 @@ sub preprocess_stderr
 sub process_stderr
 {
   my $job = shift;
 sub process_stderr
 {
   my $job = shift;
-  my $success = shift;
+  my $task_success = shift;
   preprocess_stderr ($job);
 
   map {
   preprocess_stderr ($job);
 
   map {
@@ -987,12 +1058,24 @@ sub process_stderr
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
+sub fetch_block
+{
+  my $hash = shift;
+  my ($child_out, $output_block);
+
+  my $cmd = "arv keep get \Q$hash\E";
+  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"
+  sysread($keep, $output_block, 64 * 1024 * 1024);
+  close $keep;
+  return $output_block;
+}
 
 sub collate_output
 {
 
 sub collate_output
 {
-  my $whc = Warehouse->new;
   Log (undef, "collate");
   Log (undef, "collate");
-  $whc->write_start (1);
+
+  my ($child_out, $child_in);
+  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
   my $joboutput;
   for (@jobstep)
   {
   my $joboutput;
   for (@jobstep)
   {
@@ -1003,31 +1086,40 @@ sub collate_output
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($output);
+      print $child_in $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
-      $whc->write_finish;
+      last;
     }
     }
-    elsif (defined (my $outblock = $whc->fetch_block ($output)))
+    elsif (defined (my $outblock = fetch_block ($output)))
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($outblock);
+      print $child_in $outblock;
     }
     else
     {
     }
     else
     {
-      my $errstr = $whc->errstr;
-      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
-      $success = 0;
+      Log (undef, "XXX fetch_block($output) failed XXX");
+      $main::success = 0;
+    }
+  }
+  $child_in->close;
+
+  if (!defined $joboutput) {
+    my $s = IO::Select->new($child_out);
+    if ($s->can_read(120)) {
+      sysread($child_out, $joboutput, 64 * 1024 * 1024);
+    } else {
+      Log (undef, "timed out reading from 'arv keep put'");
     }
   }
     }
   }
-  $joboutput = $whc->write_finish if !defined $joboutput;
+  waitpid($pid, 0);
+
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
-    $Job->{'output'} = $joboutput;
-    $Job->save if $job_has_uuid;
+    $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
   }
   else
   {
   }
   else
   {
@@ -1098,8 +1190,9 @@ sub Log                           # ($jobstep_id, $logmessage)
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  return if !$metastream;
-  $metastream->write_data ($datetime . " " . $message);
+  if ($metastream) {
+    print $metastream $datetime . " " . $message;
+  }
 }
 
 
 }
 
 
@@ -1119,25 +1212,27 @@ sub croak
 sub cleanup
 {
   return if !$job_has_uuid;
 sub cleanup
 {
   return if !$job_has_uuid;
-  $Job->reload;
-  $Job->{'running'} = 0;
-  $Job->{'success'} = 0;
-  $Job->{'finished_at'} = gmtime;
-  $Job->save;
+  $Job->update_attributes('running' => 0,
+                          'success' => 0,
+                          'finished_at' => scalar gmtime);
 }
 
 
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
 }
 
 
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
-  my $m = $metastream;
-  $m = $m->copy if $justcheckpoint;
-  $m->write_finish;
-  my $loglocator = $m->as_key;
-  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
-  Log (undef, "meta key is $loglocator");
+  return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+
+  $local_logfile->flush;
+  my $cmd = "arv keep put --filename \Q$keep_logfile\E "
+      . quotemeta($local_logfile->filename);
+  my $loglocator = `$cmd`;
+  die "system $cmd failed: $?" if $?;
+
+  $local_logfile = undef;   # the temp file is automatically deleted
+  Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->{'log'} = $loglocator;
-  $Job->save if $job_has_uuid;
+  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1180,65 +1275,6 @@ sub freeze
 sub thaw
 {
   croak ("Thaw not implemented");
 sub thaw
 {
   croak ("Thaw not implemented");
-
-  my $whc;
-  my $key = shift;
-  Log (undef, "thaw from $key");
-
-  @jobstep = ();
-  @jobstep_done = ();
-  @jobstep_todo = ();
-  @jobstep_tomerge = ();
-  $jobstep_tomerge_level = 0;
-  my $frozenjob = {};
-
-  my $stream = new Warehouse::Stream ( whc => $whc,
-                                      hash => [split (",", $key)] );
-  $stream->rewind;
-  while (my $dataref = $stream->read_until (undef, "\n\n"))
-  {
-    if ($$dataref =~ /^job /)
-    {
-      foreach (split ("\n", $$dataref))
-      {
-       my ($k, $v) = split ("=", $_, 2);
-       $frozenjob->{$k} = freezeunquote ($v);
-      }
-      next;
-    }
-
-    if ($$dataref =~ /^merge (\d+) (.*)/)
-    {
-      $jobstep_tomerge_level = $1;
-      @jobstep_tomerge
-         = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
-      next;
-    }
-
-    my $Jobstep = { };
-    foreach (split ("\n", $$dataref))
-    {
-      my ($k, $v) = split ("=", $_, 2);
-      $Jobstep->{$k} = freezeunquote ($v) if $k;
-    }
-    $Jobstep->{attempts} = 0;
-    push @jobstep, $Jobstep;
-
-    if ($Jobstep->{exitcode} eq "0")
-    {
-      push @jobstep_done, $#jobstep;
-    }
-    else
-    {
-      push @jobstep_todo, $#jobstep;
-    }
-  }
-
-  foreach (qw (script script_version script_parameters))
-  {
-    $Job->{$_} = $frozenjob->{$_};
-  }
-  $Job->save if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1298,6 +1334,15 @@ sub ban_node_by_slot {
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
+sub must_lock_now
+{
+  my ($lockfile, $error_message) = @_;
+  open L, ">", $lockfile or croak("$lockfile: $!");
+  if (!flock L, LOCK_EX|LOCK_NB) {
+    croak("Can't lock $lockfile: $error_message\n");
+  }
+}
+
 __DATA__
 #!/usr/bin/perl
 
 __DATA__
 #!/usr/bin/perl
 
@@ -1311,7 +1356,7 @@ my $repo = $ENV{"CRUNCH_SRC_URL"};
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit) {
+if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
     exit 0;
 }
 
     exit 0;
 }
 
@@ -1320,16 +1365,27 @@ open STDOUT, ">", "$destdir.log";
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX <DATA>;
-if(!close(TARX)) {
-  die "'tar -C $destdir -xf -' exited $?: $!";
+my @git_archive_data = <DATA>;
+if (@git_archive_data) {
+  open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
+  print TARX @git_archive_data;
+  if(!close(TARX)) {
+    die "'tar -C $destdir -xf -' exited $?: $!";
+  }
 }
 
 my $pwd;
 chomp ($pwd = `pwd`);
 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
 }
 
 my $pwd;
 chomp ($pwd = `pwd`);
 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
+
+for my $src_path ("$destdir/arvados/sdk/python") {
+  if (-d $src_path) {
+    shell_or_die ("virtualenv", $install_dir);
+    shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+  }
+}
+
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {