Add/fix some shell quoting.
[arvados.git] / sdk / cli / bin / crunch-job
index 20b65af998297037e14ff45f5943921d6d865fc5..25c1ee0857e521a898fe7711af01e44db7ee9ccf 100755 (executable)
@@ -1,5 +1,5 @@
 #!/usr/bin/perl
 #!/usr/bin/perl
-# -*- mode: perl; perl-indent-level: 2; -*-
+# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 
 =head1 NAME
 
 
 =head1 NAME
 
@@ -33,6 +33,12 @@ Path to .git directory where the specified commit is found.
 
 Arvados API authorization token to use during the course of the job.
 
 
 Arvados API authorization token to use during the course of the job.
 
+=item --no-clear-tmp
+
+Do not clear per-job/task temporary directories during initial job
+setup. This can speed up development and debugging when running jobs
+locally.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -58,7 +64,8 @@ Save a checkpoint and continue.
 =item SIGHUP
 
 Refresh node allocation (i.e., check whether any nodes have been added
 =item SIGHUP
 
 Refresh node allocation (i.e., check whether any nodes have been added
-or unallocated). Currently this is a no-op.
+or unallocated) and attributes of the Job record that should affect
+behavior (e.g., cancel job if cancelled_at becomes non-nil).
 
 =back
 
 
 =back
 
@@ -70,9 +77,10 @@ use POSIX ':sys_wait_h';
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
-use Warehouse;
-use Warehouse::Stream;
-use IPC::System::Simple qw(capturex);
+use IPC::Open2;
+use IO::Select;
+use File::Temp;
+use Fcntl ':flock';
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -83,6 +91,7 @@ unless (defined $ENV{"CRUNCH_TMP"}) {
   }
 }
 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
   }
 }
 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
+$ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
@@ -90,11 +99,13 @@ my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
+my $no_clear_tmp;
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
+           'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
     );
 
            'resume-stash=s' => \$resume_stash,
     );
 
@@ -107,10 +118,6 @@ my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
 my $local_job = !$job_has_uuid;
 
 
 my $local_job = !$job_has_uuid;
 
 
-$SIG{'HUP'} = sub
-{
-  1;
-};
 $SIG{'USR1'} = sub
 {
   $main::ENV{CRUNCH_DEBUG} = 1;
 $SIG{'USR1'} = sub
 {
   $main::ENV{CRUNCH_DEBUG} = 1;
@@ -122,10 +129,8 @@ $SIG{'USR2'} = sub
 
 
 
 
 
 
-my $arv = Arvados->new;
-my $metastream = Warehouse::Stream->new(whc => new Warehouse);
-$metastream->clear;
-$metastream->write_start('log.txt');
+my $arv = Arvados->new('apiVersion' => 'v1');
+my $metastream;
 
 my $User = $arv->{'users'}->{'current'}->execute;
 
 
 my $User = $arv->{'users'}->{'current'}->execute;
 
@@ -170,7 +175,8 @@ else
 }
 $job_id = $Job->{'uuid'};
 
 }
 $job_id = $Job->{'uuid'};
 
-
+my $keep_logfile = $job_id . '.log.txt';
+my $local_logfile = File::Temp->new();
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -189,7 +195,7 @@ if (!$have_slurm)
 }
 if (exists $ENV{SLURM_NODELIST})
 {
 }
 if (exists $ENV{SLURM_NODELIST})
 {
-  push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
+  push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
 }
 foreach (@sinfo)
 {
 }
 foreach (@sinfo)
 {
@@ -257,20 +263,17 @@ my $jobmanager_id;
 if ($job_has_uuid)
 {
   # Claim this job, and make sure nobody else does
 if ($job_has_uuid)
 {
   # Claim this job, and make sure nobody else does
-
-  $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $Job->{'started_at'} = gmtime;
-  $Job->{'running'} = 1;
-  $Job->{'success'} = undef;
-  $Job->{'tasks_summary'} = { 'failed' => 0,
-                              'todo' => 1,
-                              'running' => 0,
-                              'done' => 0 };
-  if ($job_has_uuid) {
-    unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-      croak("Error while updating / locking job");
-    }
+  unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
+          $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
+    croak("Error while updating / locking job");
   }
   }
+  $Job->update_attributes('started_at' => scalar gmtime,
+                          'running' => 1,
+                          'success' => undef,
+                          'tasks_summary' => { 'failed' => 0,
+                                               'todo' => 1,
+                                               'running' => 0,
+                                               'done' => 0 });
 }
 
 
 }
 
 
@@ -281,9 +284,12 @@ $SIG{'TERM'} = \&croak;
 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
 $SIG{'ALRM'} = sub { $main::please_info = 1; };
 $SIG{'CONT'} = sub { $main::please_continue = 1; };
 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
 $SIG{'ALRM'} = sub { $main::please_info = 1; };
 $SIG{'CONT'} = sub { $main::please_continue = 1; };
+$SIG{'HUP'} = sub { $main::please_refresh = 1; };
+
 $main::please_freeze = 0;
 $main::please_info = 0;
 $main::please_continue = 0;
 $main::please_freeze = 0;
 $main::please_info = 0;
 $main::please_continue = 0;
+$main::please_refresh = 0;
 my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
 
 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
 my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
 
 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
@@ -299,6 +305,7 @@ my $jobstep_tomerge_level = 0;
 my $squeue_checked;
 my $squeue_kill_checked;
 my $output_in_keep = 0;
 my $squeue_checked;
 my $squeue_kill_checked;
 my $output_in_keep = 0;
+my $latest_refresh = scalar time;
 
 
 
 
 
 
@@ -315,13 +322,19 @@ else
     'parameters' => {},
                                                           });
   push @jobstep, { 'level' => 0,
     'parameters' => {},
                                                           });
   push @jobstep, { 'level' => 0,
-                  'attempts' => 0,
+                  'failures' => 0,
                    'arvados_task' => $first_task,
                 };
   push @jobstep_todo, 0;
 }
 
 
                    'arvados_task' => $first_task,
                 };
   push @jobstep_todo, 0;
 }
 
 
+if (!$have_slurm)
+{
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
+
 my $build_script;
 
 
 my $build_script;
 
 
@@ -330,7 +343,21 @@ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
+  if (!defined $no_clear_tmp) {
+    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
+    system($clear_tmp_cmd) == 0
+       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
+  }
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
+  for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
+    if (-d $src_path) {
+      system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
+          or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
+      system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
+          == 0
+          or croak ("setup.py in $src_path failed: exit ".($?>>8));
+    }
+  }
 }
 else
 {
 }
 else
 {
@@ -341,22 +368,24 @@ else
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
-  # Clean out crunch_tmp/work and crunch_tmp/opt
+  if (!defined $no_clear_tmp) {
+    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
 
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
+    my $cleanpid = fork();
+    if ($cleanpid == 0)
+    {
+      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+      exit (1);
+    }
+    while (1)
+    {
+      last if $cleanpid == waitpid (-1, WNOHANG);
+      freeze_if_want_freeze ($cleanpid);
+      select (undef, undef, undef, 0.1);
+    }
+    Log (undef, "Clean-work-dir exited $?");
   }
   }
-  Log (undef, "Clean-work-dir exited $?");
 
   # Install requested code version
 
 
   # Install requested code version
 
@@ -367,29 +396,37 @@ else
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
-  $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 
   my $commit;
   my $git_archive;
   my $treeish = $Job->{'script_version'};
 
   my $commit;
   my $git_archive;
   my $treeish = $Job->{'script_version'};
-  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
-  # Todo: let script_version specify repository instead of expecting
-  # parent process to figure it out.
-  $ENV{"CRUNCH_SRC_URL"} = $repo;
 
 
-  # Create/update our clone of the remote git repo
+  # If we're running under crunch-dispatch, it will have pulled the
+  # appropriate source tree into its own repository, and given us that
+  # repo's path as $git_dir. If we're running a "local" job, and a
+  # script_version was specified, it's up to the user to provide the
+  # full path to a local repository in Job->{repository}.
+  #
+  # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
+  # git-archive --remote where appropriate.
+  #
+  # TODO: Accept a locally-hosted Arvados repository by name or
+  # UUID. Use arvados.v1.repositories.list or .get to figure out the
+  # appropriate fetch-url.
+  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
+
+  $ENV{"CRUNCH_SRC_URL"} = $repo;
 
 
-  if (!-d $ENV{"CRUNCH_SRC"}) {
-    system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
-       or croak ("git clone $repo failed: exit ".($?>>8));
-    system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
+  if (-d "$repo/.git") {
+    # We were given a working directory, but we are only interested in
+    # the index.
+    $repo = "$repo/.git";
   }
   }
-  `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
 
   # If this looks like a subversion r#, look for it in git-svn commit messages
 
   if ($treeish =~ m{^\d{1,4}$}) {
 
   # If this looks like a subversion r#, look for it in git-svn commit messages
 
   if ($treeish =~ m{^\d{1,4}$}) {
-    my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
+    my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
     chomp $gitlog;
     if ($gitlog =~ /^[a-f0-9]{40}$/) {
       $commit = $gitlog;
     chomp $gitlog;
     if ($gitlog =~ /^[a-f0-9]{40}$/) {
       $commit = $gitlog;
@@ -400,15 +437,7 @@ else
   # If that didn't work, try asking git to look it up as a tree-ish.
 
   if (!defined $commit) {
   # If that didn't work, try asking git to look it up as a tree-ish.
 
   if (!defined $commit) {
-
-    my $cooked_treeish = $treeish;
-    if ($treeish !~ m{^[0-9a-f]{5,}$}) {
-      # Looks like a git branch name -- make sure git knows it's
-      # relative to the remote repo
-      $cooked_treeish = "origin/$treeish";
-    }
-
-    my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
+    my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
     chomp $found;
     if ($found =~ /^[0-9a-f]{40}$/s) {
       $commit = $found;
     chomp $found;
     if ($found =~ /^[0-9a-f]{40}$/s) {
       $commit = $found;
@@ -421,7 +450,9 @@ else
        Log (undef, "Using commit $commit for tree-ish $treeish");
         if ($commit ne $treeish) {
           $Job->{'script_version'} = $commit;
        Log (undef, "Using commit $commit for tree-ish $treeish");
         if ($commit ne $treeish) {
           $Job->{'script_version'} = $commit;
-          !$job_has_uuid or $Job->save() or croak("Error while updating job");
+          !$job_has_uuid or
+              $Job->update_attributes('script_version' => $commit) or
+              croak("Error while updating job");
         }
       }
     }
         }
       }
     }
@@ -431,7 +462,7 @@ else
     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
     @execargs = ("sh", "-c",
                 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
     @execargs = ("sh", "-c",
                 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
+    $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
   }
   else {
     croak ("could not figure out commit id for $treeish");
   }
   else {
     croak ("could not figure out commit id for $treeish");
@@ -452,6 +483,12 @@ else
   Log (undef, "Install exited $?");
 }
 
   Log (undef, "Install exited $?");
 }
 
+if (!$have_slurm)
+{
+  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -467,7 +504,7 @@ foreach (split (/\n/, $Job->{knobs}))
 
 
 
 
 
 
-my $success;
+$main::success = undef;
 
 
 
 
 
 
@@ -504,12 +541,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   {
     next;
   }
   {
     next;
   }
-  if ($Jobstep->{attempts} > 2)
-  {
-    Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
-    $success = 0;
-    last THISROUND;
-  }
 
   pipe $reader{$id}, "writer" or croak ($!);
   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
 
   pipe $reader{$id}, "writer" or croak ($!);
   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
@@ -549,9 +580,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
+    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
     $ENV{"GZIP"} = "-n";
 
 
     $ENV{"GZIP"} = "-n";
 
@@ -564,7 +597,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my @execargs = qw(sh);
     my $build_script_to_send = "";
     my $command =
     my @execargs = qw(sh);
     my $build_script_to_send = "";
     my $command =
-       "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
+       "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
+        ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
        ."&& cd $ENV{CRUNCH_TMP} ";
     if ($build_script)
     {
        ."&& cd $ENV{CRUNCH_TMP} ";
     if ($build_script)
     {
@@ -572,14 +606,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .=
          "&& perl -";
     }
       $command .=
          "&& perl -";
     }
-    $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
-    $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
-    $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
     $command .=
     $command .=
-        "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+        "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     my @execargs = ('bash', '-c', $command);
     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
     my @execargs = ('bash', '-c', $command);
     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
-    exit (1);
+    exit (111);
   }
   close("writer");
   if (!defined $childpid)
   }
   close("writer");
   if (!defined $childpid)
@@ -599,7 +630,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
   Log ($id, "child $childpid started on $childslotname");
 
   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
   Log ($id, "child $childpid started on $childslotname");
-  $Jobstep->{attempts} ++;
   $Jobstep->{starttime} = time;
   $Jobstep->{node} = $childnode->{name};
   $Jobstep->{slotindex} = $childslot;
   $Jobstep->{starttime} = time;
   $Jobstep->{node} = $childnode->{name};
   $Jobstep->{slotindex} = $childslot;
@@ -629,6 +659,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
        + reapchildren ();
     if (!$gotsome)
     {
        + reapchildren ();
     if (!$gotsome)
     {
+      check_refresh_wanted();
       check_squeue();
       update_progress_stats();
       select (undef, undef, undef, 0.1);
       check_squeue();
       update_progress_stats();
       select (undef, undef, undef, 0.1);
@@ -685,6 +716,7 @@ while (%proc)
   readfrompipes ();
   if (!reapchildren())
   {
   readfrompipes ();
   if (!reapchildren())
   {
+    check_refresh_wanted();
     check_squeue();
     update_progress_stats();
     select (undef, undef, undef, 0.1);
     check_squeue();
     update_progress_stats();
     select (undef, undef, undef, 0.1);
@@ -696,7 +728,7 @@ update_progress_stats();
 freeze_if_want_freeze();
 
 
 freeze_if_want_freeze();
 
 
-if (!defined $success)
+if (!defined $main::success)
 {
   if (@jobstep_todo &&
       $thisround_succeeded == 0 &&
 {
   if (@jobstep_todo &&
       $thisround_succeeded == 0 &&
@@ -704,34 +736,44 @@ if (!defined $success)
   {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
   {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
-    $success = 0;
+    $main::success = 0;
   }
   if (!@jobstep_todo)
   {
   }
   if (!@jobstep_todo)
   {
-    $success = 1;
+    $main::success = 1;
   }
 }
 
   }
 }
 
-goto ONELEVEL if !defined $success;
+goto ONELEVEL if !defined $main::success;
 
 
 release_allocation();
 freeze();
 
 
 release_allocation();
 freeze();
-$Job->reload;
-$Job->{'output'} = &collate_output();
-$Job->{'running'} = 0;
-$Job->{'success'} = $Job->{'output'} && $success;
-$Job->{'finished_at'} = gmtime;
-$Job->save if $job_has_uuid;
+if ($job_has_uuid) {
+  $Job->update_attributes('output' => &collate_output(),
+                          'running' => 0,
+                          'success' => $Job->{'output'} && $main::success,
+                          'finished_at' => scalar gmtime)
+}
 
 if ($Job->{'output'})
 {
   eval {
 
 if ($Job->{'output'})
 {
   eval {
-    my $manifest_text = capturex("whget", $Job->{'output'});
+    my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     });
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     });
+    if ($Job->{'output_is_persistent'}) {
+      $arv->{'links'}->{'create'}->execute('link' => {
+        'tail_kind' => 'arvados#user',
+        'tail_uuid' => $User->{'uuid'},
+        'head_kind' => 'arvados#collection',
+        'head_uuid' => $Job->{'output'},
+        'link_class' => 'resources',
+        'name' => 'wants',
+      });
+    }
   };
   if ($@) {
     Log (undef, "Failed to register output manifest: $@");
   };
   if ($@) {
     Log (undef, "Failed to register output manifest: $@");
@@ -756,7 +798,9 @@ sub update_progress_stats
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
   $Job->{'tasks_summary'}->{'todo'} = $todo;
   $Job->{'tasks_summary'}->{'done'} = $done;
   $Job->{'tasks_summary'}->{'running'} = $running;
-  $Job->save if $job_has_uuid;
+  if ($job_has_uuid) {
+    $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
+  }
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
   Log (undef, "status: $done done, $running running, $todo todo");
   $progress_is_dirty = 0;
 }
@@ -775,27 +819,32 @@ sub reapchildren
   my $elapsed = time - $proc{$pid}->{time};
   my $Jobstep = $jobstep[$jobstepid];
 
   my $elapsed = time - $proc{$pid}->{time};
   my $Jobstep = $jobstep[$jobstepid];
 
-  my $exitcode = $?;
-  my $exitinfo = "exit $exitcode";
+  my $childstatus = $?;
+  my $exitvalue = $childstatus >> 8;
+  my $exitinfo = sprintf("exit %d signal %d%s",
+                         $exitvalue,
+                         $childstatus & 127,
+                         ($childstatus & 128 ? ' core dump' : ''));
   $Jobstep->{'arvados_task'}->reload;
   $Jobstep->{'arvados_task'}->reload;
-  my $success = $Jobstep->{'arvados_task'}->{success};
+  my $task_success = $Jobstep->{'arvados_task'}->{success};
 
 
-  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
+  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
 
 
-  if (!defined $success) {
+  if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
     # task did not indicate one way or the other --> fail
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
-    $success = 0;
+    $task_success = 0;
   }
 
   }
 
-  if (!$success)
+  if (!$task_success)
   {
   {
-    my $no_incr_attempts;
-    $no_incr_attempts = 1 if $Jobstep->{node_fail};
+    my $temporary_fail;
+    $temporary_fail ||= $Jobstep->{node_fail};
+    $temporary_fail ||= ($exitvalue == 111);
 
     ++$thisround_failed;
 
     ++$thisround_failed;
-    ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
+    ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
 
     # Check for signs of a failed or misconfigured node
     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
 
     # Check for signs of a failed or misconfigured node
     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
@@ -803,18 +852,28 @@ sub reapchildren
       # Don't count this against jobstep failure thresholds if this
       # node is already suspected faulty and srun exited quickly
       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
       # Don't count this against jobstep failure thresholds if this
       # node is already suspected faulty and srun exited quickly
       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5 &&
-         $Jobstep->{attempts} > 1) {
-       Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
-        $no_incr_attempts = 1;
+         $elapsed < 5) {
+       Log ($jobstepid, "blaming failure on suspect node " .
+             $slot[$proc{$pid}->{slot}]->{node}->{name});
+        $temporary_fail ||= 1;
       }
       ban_node_by_slot($proc{$pid}->{slot});
     }
 
       }
       ban_node_by_slot($proc{$pid}->{slot});
     }
 
-    push @jobstep_todo, $jobstepid;
-    Log ($jobstepid, "failure in $elapsed seconds");
+    Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
+                             ++$Jobstep->{'failures'},
+                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $elapsed));
 
 
-    --$Jobstep->{attempts} if $no_incr_attempts;
+    if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+      # Give up on this task, and the whole job
+      $main::success = 0;
+      $main::please_freeze = 1;
+    }
+    else {
+      # Put this task back on the todo queue
+      push @jobstep_todo, $jobstepid;
+    }
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
@@ -825,9 +884,9 @@ sub reapchildren
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
-  $Jobstep->{exitcode} = $exitcode;
+  $Jobstep->{exitcode} = $childstatus;
   $Jobstep->{finishtime} = time;
   $Jobstep->{finishtime} = time;
-  process_stderr ($jobstepid, $success);
+  process_stderr ($jobstepid, $task_success);
   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
 
   close $reader{$jobstepid};
   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
 
   close $reader{$jobstepid};
@@ -846,7 +905,7 @@ sub reapchildren
   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
     my $jobstep = {
       'level' => $arvados_task->{'sequence'},
   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
     my $jobstep = {
       'level' => $arvados_task->{'sequence'},
-      'attempts' => 0,
+      'failures' => 0,
       'arvados_task' => $arvados_task
     };
     push @jobstep, $jobstep;
       'arvados_task' => $arvados_task
     };
     push @jobstep, $jobstep;
@@ -857,6 +916,27 @@ sub reapchildren
   1;
 }
 
   1;
 }
 
+sub check_refresh_wanted
+{
+  my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
+  if (@stat && $stat[9] > $latest_refresh) {
+    $latest_refresh = scalar time;
+    if ($job_has_uuid) {
+      my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+      for my $attr ('cancelled_at',
+                    'cancelled_by_user_uuid',
+                    'cancelled_by_client_uuid') {
+        $Job->{$attr} = $Job2->{$attr};
+      }
+      if ($Job->{'cancelled_at'}) {
+        Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
+             " by user " . $Job->{cancelled_by_user_uuid});
+        $main::success = 0;
+        $main::please_freeze = 1;
+      }
+    }
+  }
+}
 
 sub check_squeue
 {
 
 sub check_squeue
 {
@@ -981,7 +1061,7 @@ sub preprocess_stderr
 sub process_stderr
 {
   my $job = shift;
 sub process_stderr
 {
   my $job = shift;
-  my $success = shift;
+  my $task_success = shift;
   preprocess_stderr ($job);
 
   map {
   preprocess_stderr ($job);
 
   map {
@@ -989,12 +1069,24 @@ sub process_stderr
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
+sub fetch_block
+{
+  my $hash = shift;
+  my ($keep, $child_out, $output_block);
+
+  my $cmd = "arv keep get \Q$hash\E";
+  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
+  sysread($keep, $output_block, 64 * 1024 * 1024);
+  close $keep;
+  return $output_block;
+}
 
 sub collate_output
 {
 
 sub collate_output
 {
-  my $whc = Warehouse->new;
   Log (undef, "collate");
   Log (undef, "collate");
-  $whc->write_start (1);
+
+  my ($child_out, $child_in);
+  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
   my $joboutput;
   for (@jobstep)
   {
   my $joboutput;
   for (@jobstep)
   {
@@ -1005,31 +1097,41 @@ sub collate_output
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($output);
+      print $child_in $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
-      $whc->write_finish;
+      last;
     }
     }
-    elsif (defined (my $outblock = $whc->fetch_block ($output)))
+    elsif (defined (my $outblock = fetch_block ($output)))
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($outblock);
+      print $child_in $outblock;
     }
     else
     {
     }
     else
     {
-      my $errstr = $whc->errstr;
-      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
-      $success = 0;
+      Log (undef, "XXX fetch_block($output) failed XXX");
+      $main::success = 0;
     }
   }
     }
   }
-  $joboutput = $whc->write_finish if !defined $joboutput;
+  $child_in->close;
+
+  if (!defined $joboutput) {
+    my $s = IO::Select->new($child_out);
+    if ($s->can_read(120)) {
+      sysread($child_out, $joboutput, 64 * 1024 * 1024);
+      chomp($joboutput);
+    } else {
+      Log (undef, "timed out reading from 'arv keep put'");
+    }
+  }
+  waitpid($pid, 0);
+
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
-    $Job->{'output'} = $joboutput;
-    $Job->save if $job_has_uuid;
+    $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
   }
   else
   {
   }
   else
   {
@@ -1100,8 +1202,9 @@ sub Log                           # ($jobstep_id, $logmessage)
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  return if !$metastream;
-  $metastream->write_data ($datetime . " " . $message);
+  if ($metastream) {
+    print $metastream $datetime . " " . $message;
+  }
 }
 
 
 }
 
 
@@ -1121,25 +1224,27 @@ sub croak
 sub cleanup
 {
   return if !$job_has_uuid;
 sub cleanup
 {
   return if !$job_has_uuid;
-  $Job->reload;
-  $Job->{'running'} = 0;
-  $Job->{'success'} = 0;
-  $Job->{'finished_at'} = gmtime;
-  $Job->save;
+  $Job->update_attributes('running' => 0,
+                          'success' => 0,
+                          'finished_at' => scalar gmtime);
 }
 
 
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
 }
 
 
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
-  my $m = $metastream;
-  $m = $m->copy if $justcheckpoint;
-  $m->write_finish;
-  my $loglocator = $m->as_key;
-  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
-  Log (undef, "meta key is $loglocator");
+  return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+
+  $local_logfile->flush;
+  my $cmd = "arv keep put --filename ''\Q$keep_logfile\E "
+      . quotemeta($local_logfile->filename);
+  my $loglocator = `$cmd`;
+  die "system $cmd failed: $?" if $?;
+
+  $local_logfile = undef;   # the temp file is automatically deleted
+  Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->{'log'} = $loglocator;
-  $Job->save if $job_has_uuid;
+  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1182,65 +1287,6 @@ sub freeze
 sub thaw
 {
   croak ("Thaw not implemented");
 sub thaw
 {
   croak ("Thaw not implemented");
-
-  my $whc;
-  my $key = shift;
-  Log (undef, "thaw from $key");
-
-  @jobstep = ();
-  @jobstep_done = ();
-  @jobstep_todo = ();
-  @jobstep_tomerge = ();
-  $jobstep_tomerge_level = 0;
-  my $frozenjob = {};
-
-  my $stream = new Warehouse::Stream ( whc => $whc,
-                                      hash => [split (",", $key)] );
-  $stream->rewind;
-  while (my $dataref = $stream->read_until (undef, "\n\n"))
-  {
-    if ($$dataref =~ /^job /)
-    {
-      foreach (split ("\n", $$dataref))
-      {
-       my ($k, $v) = split ("=", $_, 2);
-       $frozenjob->{$k} = freezeunquote ($v);
-      }
-      next;
-    }
-
-    if ($$dataref =~ /^merge (\d+) (.*)/)
-    {
-      $jobstep_tomerge_level = $1;
-      @jobstep_tomerge
-         = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
-      next;
-    }
-
-    my $Jobstep = { };
-    foreach (split ("\n", $$dataref))
-    {
-      my ($k, $v) = split ("=", $_, 2);
-      $Jobstep->{$k} = freezeunquote ($v) if $k;
-    }
-    $Jobstep->{attempts} = 0;
-    push @jobstep, $Jobstep;
-
-    if ($Jobstep->{exitcode} eq "0")
-    {
-      push @jobstep_done, $#jobstep;
-    }
-    else
-    {
-      push @jobstep_todo, $#jobstep;
-    }
-  }
-
-  foreach (qw (script script_version script_parameters))
-  {
-    $Job->{$_} = $frozenjob->{$_};
-  }
-  $Job->save if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1300,6 +1346,15 @@ sub ban_node_by_slot {
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
+sub must_lock_now
+{
+  my ($lockfile, $error_message) = @_;
+  open L, ">", $lockfile or croak("$lockfile: $!");
+  if (!flock L, LOCK_EX|LOCK_NB) {
+    croak("Can't lock $lockfile: $error_message\n");
+  }
+}
+
 __DATA__
 #!/usr/bin/perl
 
 __DATA__
 #!/usr/bin/perl
 
@@ -1313,7 +1368,7 @@ my $repo = $ENV{"CRUNCH_SRC_URL"};
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit) {
+if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
     exit 0;
 }
 
     exit 0;
 }
 
@@ -1322,16 +1377,27 @@ open STDOUT, ">", "$destdir.log";
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX <DATA>;
-if(!close(TARX)) {
-  die "'tar -C $destdir -xf -' exited $?: $!";
+my @git_archive_data = <DATA>;
+if (@git_archive_data) {
+  open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
+  print TARX @git_archive_data;
+  if(!close(TARX)) {
+    die "'tar -C $destdir -xf -' exited $?: $!";
+  }
 }
 
 my $pwd;
 chomp ($pwd = `pwd`);
 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
 }
 
 my $pwd;
 chomp ($pwd = `pwd`);
 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
+
+for my $src_path ("$destdir/arvados/sdk/python") {
+  if (-d $src_path) {
+    shell_or_die ("virtualenv", $install_dir);
+    shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+  }
+}
+
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {