move crunch-job to arv-crunch-job and add to arvados-cli gem
[arvados.git] / services / crunch / crunch-job
deleted file mode 100755 (executable)
index 10acb8cbbe72ee99a539e49ebd0dfe168405c924..0000000000000000000000000000000000000000
+++ /dev/null
-#!/usr/bin/perl
-# -*- mode: perl; perl-indent-level: 2; -*-
-
-=head1 NAME
-
-crunch-job: Execute job steps, save snapshots as requested, collate output.
-
-=head1 SYNOPSIS
-
-Obtain job details from Arvados, run tasks on compute nodes (typically
-invoked by scheduler on controller):
-
- crunch-job --job x-y-z
-
-Obtain job details from command line, run tasks on local machine
-(typically invoked by application or developer on VM):
-
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
-
-=head1 OPTIONS
-
-=over
-
-=item --force-unlock
-
-If the job is already locked, steal the lock and run it anyway.
-
-=item --git-dir
-
-Path to .git directory where the specified commit is found.
-
-=item --job-api-token
-
-Arvados API authorization token to use during the course of the job.
-
-=back
-
-=head1 RUNNING JOBS LOCALLY
-
-crunch-job's log messages appear on stderr along with the job tasks'
-stderr streams. The log is saved in Keep at each checkpoint and when
-the job finishes.
-
-If the job succeeds, the job's output locator is printed on stdout.
-
-While the job is running, the following signals are accepted:
-
-=over
-
-=item control-C, SIGINT, SIGQUIT
-
-Save a checkpoint, terminate any job tasks that are running, and stop.
-
-=item SIGALRM
-
-Save a checkpoint and continue.
-
-=item SIGHUP
-
-Refresh node allocation (i.e., check whether any nodes have been added
-or unallocated). Currently this is a no-op.
-
-=back
-
-=cut
-
-
-use strict;
-use POSIX ':sys_wait_h';
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Arvados;
-use Getopt::Long;
-use Warehouse;
-use Warehouse::Stream;
-use IPC::System::Simple qw(capturex);
-
-$ENV{"TMPDIR"} ||= "/tmp";
-$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
-if ($ENV{"USER"} ne "crunch" && $< != 0) {
-  # use a tmp dir unique for my uid
-  $ENV{"CRUNCH_TMP"} .= "-$<";
-}
-$ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
-$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
-mkdir ($ENV{"JOB_WORK"});
-
-my $force_unlock;
-my $git_dir;
-my $jobspec;
-my $job_api_token;
-my $resume_stash;
-GetOptions('force-unlock' => \$force_unlock,
-           'git-dir=s' => \$git_dir,
-           'job=s' => \$jobspec,
-           'job-api-token=s' => \$job_api_token,
-           'resume-stash=s' => \$resume_stash,
-    );
-
-if (defined $job_api_token) {
-  $ENV{ARVADOS_API_TOKEN} = $job_api_token;
-}
-
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
-my $local_job = !$job_has_uuid;
-
-
-$SIG{'HUP'} = sub
-{
-  1;
-};
-$SIG{'USR1'} = sub
-{
-  $main::ENV{CRUNCH_DEBUG} = 1;
-};
-$SIG{'USR2'} = sub
-{
-  $main::ENV{CRUNCH_DEBUG} = 0;
-};
-
-
-
-my $arv = Arvados->new;
-my $metastream = Warehouse::Stream->new(whc => new Warehouse);
-$metastream->clear;
-$metastream->write_start('log.txt');
-
-my $User = $arv->{'users'}->{'current'}->execute;
-
-my $Job = {};
-my $job_id;
-my $dbh;
-my $sth;
-if ($job_has_uuid)
-{
-  $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
-  if (!$force_unlock) {
-    if ($Job->{'is_locked_by_uuid'}) {
-      croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
-    }
-    if ($Job->{'success'} ne undef) {
-      croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
-    }
-    if ($Job->{'running'}) {
-      croak("Job 'running' flag is already set");
-    }
-    if ($Job->{'started_at'}) {
-      croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
-    }
-  }
-}
-else
-{
-  $Job = JSON::decode_json($jobspec);
-
-  if (!$resume_stash)
-  {
-    map { croak ("No $_ specified") unless $Job->{$_} }
-    qw(script script_version script_parameters);
-  }
-
-  $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $Job->{'started_at'} = gmtime;
-
-  $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
-
-  $job_has_uuid = 1;
-}
-$job_id = $Job->{'uuid'};
-
-
-
-$Job->{'resource_limits'} ||= {};
-$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
-my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
-
-
-Log (undef, "check slurm allocation");
-my @slot;
-my @node;
-# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
-my @sinfo;
-if (!$have_slurm)
-{
-  my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
-  push @sinfo, "$localcpus localhost";
-}
-if (exists $ENV{SLURM_NODELIST})
-{
-  push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
-}
-foreach (@sinfo)
-{
-  my ($ncpus, $slurm_nodelist) = split;
-  $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
-
-  my @nodelist;
-  while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
-  {
-    my $nodelist = $1;
-    if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
-    {
-      my $ranges = $1;
-      foreach (split (",", $ranges))
-      {
-       my ($a, $b);
-       if (/(\d+)-(\d+)/)
-       {
-         $a = $1;
-         $b = $2;
-       }
-       else
-       {
-         $a = $_;
-         $b = $_;
-       }
-       push @nodelist, map {
-         my $n = $nodelist;
-         $n =~ s/\[[-,\d]+\]/$_/;
-         $n;
-       } ($a..$b);
-      }
-    }
-    else
-    {
-      push @nodelist, $nodelist;
-    }
-  }
-  foreach my $nodename (@nodelist)
-  {
-    Log (undef, "node $nodename - $ncpus slots");
-    my $node = { name => $nodename,
-                ncpus => $ncpus,
-                losing_streak => 0,
-                hold_until => 0 };
-    foreach my $cpu (1..$ncpus)
-    {
-      push @slot, { node => $node,
-                   cpu => $cpu };
-    }
-  }
-  push @node, @nodelist;
-}
-
-
-
-# Ensure that we get one jobstep running on each allocated node before
-# we start overloading nodes with concurrent steps
-
-@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
-
-
-
-my $jobmanager_id;
-if ($job_has_uuid)
-{
-  # Claim this job, and make sure nobody else does
-
-  $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $Job->{'started_at'} = gmtime;
-  $Job->{'running'} = 1;
-  $Job->{'success'} = undef;
-  $Job->{'tasks_summary'} = { 'failed' => 0,
-                              'todo' => 1,
-                              'running' => 0,
-                              'done' => 0 };
-  if ($job_has_uuid) {
-    unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-      croak("Error while updating / locking job");
-    }
-  }
-}
-
-
-Log (undef, "start");
-$SIG{'INT'} = sub { $main::please_freeze = 1; };
-$SIG{'QUIT'} = sub { $main::please_freeze = 1; };
-$SIG{'TERM'} = \&croak;
-$SIG{'TSTP'} = sub { $main::please_freeze = 1; };
-$SIG{'ALRM'} = sub { $main::please_info = 1; };
-$SIG{'CONT'} = sub { $main::please_continue = 1; };
-$main::please_freeze = 0;
-$main::please_info = 0;
-$main::please_continue = 0;
-my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
-
-grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
-$ENV{"CRUNCH_JOB_UUID"} = $job_id;
-$ENV{"JOB_UUID"} = $job_id;
-
-
-my @jobstep;
-my @jobstep_todo = ();
-my @jobstep_done = ();
-my @jobstep_tomerge = ();
-my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
-my $output_in_keep = 0;
-
-
-
-if (defined $Job->{thawedfromkey})
-{
-  thaw ($Job->{thawedfromkey});
-}
-else
-{
-  my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
-    'job_uuid' => $Job->{'uuid'},
-    'sequence' => 0,
-    'qsequence' => 0,
-    'parameters' => {},
-                                                          });
-  push @jobstep, { 'level' => 0,
-                  'attempts' => 0,
-                   'arvados_task' => $first_task,
-                };
-  push @jobstep_todo, 0;
-}
-
-
-my $build_script;
-
-
-$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-
-my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
-if ($skip_install)
-{
-  $ENV{"CRUNCH_SRC"} = $Job->{script_version};
-}
-else
-{
-  do {
-    local $/ = undef;
-    $build_script = <DATA>;
-  };
-  Log (undef, "Install revision ".$Job->{script_version});
-  my $nodelist = join(",", @node);
-
-  # Clean out crunch_tmp/work and crunch_tmp/opt
-
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
-  }
-  Log (undef, "Clean-work-dir exited $?");
-
-  # Install requested code version
-
-  my @execargs;
-  my @srunargs = ("srun",
-                 "--nodelist=$nodelist",
-                 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-
-  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-  $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
-  $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
-
-  my $commit;
-  my $git_archive;
-  my $treeish = $Job->{'script_version'};
-  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
-  # Todo: let script_version specify repository instead of expecting
-  # parent process to figure it out.
-  $ENV{"CRUNCH_SRC_URL"} = $repo;
-
-  # Create/update our clone of the remote git repo
-
-  if (!-d $ENV{"CRUNCH_SRC"}) {
-    system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
-       or croak ("git clone $repo failed: exit ".($?>>8));
-    system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
-  }
-  `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
-
-  # If this looks like a subversion r#, look for it in git-svn commit messages
-
-  if ($treeish =~ m{^\d{1,4}$}) {
-    my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
-    chomp $gitlog;
-    if ($gitlog =~ /^[a-f0-9]{40}$/) {
-      $commit = $gitlog;
-      Log (undef, "Using commit $commit for script_version $treeish");
-    }
-  }
-
-  # If that didn't work, try asking git to look it up as a tree-ish.
-
-  if (!defined $commit) {
-
-    my $cooked_treeish = $treeish;
-    if ($treeish !~ m{^[0-9a-f]{5,}$}) {
-      # Looks like a git branch name -- make sure git knows it's
-      # relative to the remote repo
-      $cooked_treeish = "origin/$treeish";
-    }
-
-    my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
-    chomp $found;
-    if ($found =~ /^[0-9a-f]{40}$/s) {
-      $commit = $found;
-      if ($commit ne $treeish) {
-       # Make sure we record the real commit id in the database,
-       # frozentokey, logs, etc. -- instead of an abbreviation or a
-       # branch name which can become ambiguous or point to a
-       # different commit in the future.
-       $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-       Log (undef, "Using commit $commit for tree-ish $treeish");
-        if ($commit ne $treeish) {
-          $Job->{'script_version'} = $commit;
-          !$job_has_uuid or $Job->save() or croak("Error while updating job");
-        }
-      }
-    }
-  }
-
-  if (defined $commit) {
-    $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-    @execargs = ("sh", "-c",
-                "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
-  }
-  else {
-    croak ("could not figure out commit id for $treeish");
-  }
-
-  my $installpid = fork();
-  if ($installpid == 0)
-  {
-    srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $installpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($installpid);
-    select (undef, undef, undef, 0.1);
-  }
-  Log (undef, "Install exited $?");
-}
-
-
-
-foreach (qw (script script_version script_parameters resource_limits))
-{
-  Log (undef,
-       "$_ " .
-       (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
-}
-foreach (split (/\n/, $Job->{knobs}))
-{
-  Log (undef, "knob " . $_);
-}
-
-
-
-my $success;
-
-
-
-ONELEVEL:
-
-my $thisround_succeeded = 0;
-my $thisround_failed = 0;
-my $thisround_failed_multiple = 0;
-
-@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
-                      or $a <=> $b } @jobstep_todo;
-my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
-
-
-
-my %proc;
-my @freeslot = (0..$#slot);
-my @holdslot;
-my %reader;
-my $progress_is_dirty = 1;
-my $progress_stats_updated = 0;
-
-update_progress_stats();
-
-
-
-THISROUND:
-for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
-{
-  $main::please_continue = 0;
-
-  my $id = $jobstep_todo[$todo_ptr];
-  my $Jobstep = $jobstep[$id];
-  if ($Jobstep->{level} != $level)
-  {
-    next;
-  }
-  if ($Jobstep->{attempts} > 9)
-  {
-    Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
-    $success = 0;
-    last THISROUND;
-  }
-
-  pipe $reader{$id}, "writer" or croak ($!);
-  my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
-  fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
-
-  my $childslot = $freeslot[0];
-  my $childnode = $slot[$childslot]->{node};
-  my $childslotname = join (".",
-                           $slot[$childslot]->{node}->{name},
-                           $slot[$childslot]->{cpu});
-  my $childpid = fork();
-  if ($childpid == 0)
-  {
-    $SIG{'INT'} = 'DEFAULT';
-    $SIG{'QUIT'} = 'DEFAULT';
-    $SIG{'TERM'} = 'DEFAULT';
-
-    foreach (values (%reader))
-    {
-      close($_);
-    }
-    fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
-    open(STDOUT,">&writer");
-    open(STDERR,">&writer");
-
-    undef $dbh;
-    undef $sth;
-
-    delete $ENV{"GNUPGHOME"};
-    $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
-    $ENV{"TASK_QSEQUENCE"} = $id;
-    $ENV{"TASK_SEQUENCE"} = $level;
-    $ENV{"JOB_SCRIPT"} = $Job->{script};
-    while (my ($param, $value) = each %{$Job->{script_parameters}}) {
-      $param =~ tr/a-z/A-Z/;
-      $ENV{"JOB_PARAMETER_$param"} = $value;
-    }
-    $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
-    $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
-    $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-    $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
-
-    $ENV{"GZIP"} = "-n";
-
-    my @srunargs = (
-      "srun",
-      "--nodelist=".$childnode->{name},
-      qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
-      "--job-name=$job_id.$id.$$",
-       );
-    my @execargs = qw(sh);
-    my $build_script_to_send = "";
-    my $command =
-       "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
-       ."&& cd $ENV{CRUNCH_TMP} ";
-    if ($build_script)
-    {
-      $build_script_to_send = $build_script;
-      $command .=
-         "&& perl -";
-    }
-    $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
-    $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
-    $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
-    $command .=
-        "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
-    my @execargs = ('bash', '-c', $command);
-    srun (\@srunargs, \@execargs, undef, $build_script_to_send);
-    exit (1);
-  }
-  close("writer");
-  if (!defined $childpid)
-  {
-    close $reader{$id};
-    delete $reader{$id};
-    next;
-  }
-  shift @freeslot;
-  $proc{$childpid} = { jobstep => $id,
-                      time => time,
-                      slot => $childslot,
-                      jobstepname => "$job_id.$id.$childpid",
-                    };
-  croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
-  $slot[$childslot]->{pid} = $childpid;
-
-  Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
-  Log ($id, "child $childpid started on $childslotname");
-  $Jobstep->{attempts} ++;
-  $Jobstep->{starttime} = time;
-  $Jobstep->{node} = $childnode->{name};
-  $Jobstep->{slotindex} = $childslot;
-  delete $Jobstep->{stderr};
-  delete $Jobstep->{finishtime};
-
-  splice @jobstep_todo, $todo_ptr, 1;
-  --$todo_ptr;
-
-  $progress_is_dirty = 1;
-
-  while (!@freeslot
-        ||
-        (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
-  {
-    last THISROUND if $main::please_freeze;
-    if ($main::please_info)
-    {
-      $main::please_info = 0;
-      freeze();
-      collate_output();
-      save_meta(1);
-      update_progress_stats();
-    }
-    my $gotsome
-       = readfrompipes ()
-       + reapchildren ();
-    if (!$gotsome)
-    {
-      check_squeue();
-      update_progress_stats();
-      select (undef, undef, undef, 0.1);
-    }
-    elsif (time - $progress_stats_updated >= 30)
-    {
-      update_progress_stats();
-    }
-    if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
-       ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
-    {
-      my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
-         .($thisround_failed+$thisround_succeeded)
-         .") -- giving up on this round";
-      Log (undef, $message);
-      last THISROUND;
-    }
-
-    # move slots from freeslot to holdslot (or back to freeslot) if necessary
-    for (my $i=$#freeslot; $i>=0; $i--) {
-      if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
-       push @holdslot, (splice @freeslot, $i, 1);
-      }
-    }
-    for (my $i=$#holdslot; $i>=0; $i--) {
-      if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
-       push @freeslot, (splice @holdslot, $i, 1);
-      }
-    }
-
-    # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
-      last THISROUND;
-    }
-  }
-}
-
-
-push @freeslot, splice @holdslot;
-map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
-
-
-Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
-while (%proc)
-{
-  goto THISROUND if $main::please_continue;
-  $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
-  readfrompipes ();
-  if (!reapchildren())
-  {
-    check_squeue();
-    update_progress_stats();
-    select (undef, undef, undef, 0.1);
-    killem (keys %proc) if $main::please_freeze;
-  }
-}
-
-update_progress_stats();
-freeze_if_want_freeze();
-
-
-if (!defined $success)
-{
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
-    my $message = "stop because $thisround_failed tasks failed and none succeeded";
-    Log (undef, $message);
-    $success = 0;
-  }
-  if (!@jobstep_todo)
-  {
-    $success = 1;
-  }
-}
-
-goto ONELEVEL if !defined $success;
-
-
-release_allocation();
-freeze();
-$Job->reload;
-$Job->{'output'} = &collate_output();
-$Job->{'running'} = 0;
-$Job->{'success'} = $Job->{'output'} && $success;
-$Job->{'finished_at'} = gmtime;
-$Job->save if $job_has_uuid;
-
-if ($Job->{'output'})
-{
-  eval {
-    my $manifest_text = capturex("whget", $Job->{'output'});
-    $arv->{'collections'}->{'create'}->execute('collection' => {
-      'uuid' => $Job->{'output'},
-      'manifest_text' => $manifest_text,
-    });
-  };
-  if ($@) {
-    Log (undef, "Failed to register output manifest: $@");
-  }
-}
-
-Log (undef, "finish");
-
-save_meta();
-exit 0;
-
-
-
-sub update_progress_stats
-{
-  $progress_stats_updated = time;
-  return if !$progress_is_dirty;
-  my ($todo, $done, $running) = (scalar @jobstep_todo,
-                                scalar @jobstep_done,
-                                scalar @slot - scalar @freeslot - scalar @holdslot);
-  $Job->{'tasks_summary'} ||= {};
-  $Job->{'tasks_summary'}->{'todo'} = $todo;
-  $Job->{'tasks_summary'}->{'done'} = $done;
-  $Job->{'tasks_summary'}->{'running'} = $running;
-  $Job->save if $job_has_uuid;
-  Log (undef, "status: $done done, $running running, $todo todo");
-  $progress_is_dirty = 0;
-}
-
-
-
-sub reapchildren
-{
-  my $pid = waitpid (-1, WNOHANG);
-  return 0 if $pid <= 0;
-
-  my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
-                 . "."
-                 . $slot[$proc{$pid}->{slot}]->{cpu});
-  my $jobstepid = $proc{$pid}->{jobstep};
-  my $elapsed = time - $proc{$pid}->{time};
-  my $Jobstep = $jobstep[$jobstepid];
-
-  my $exitcode = $?;
-  my $exitinfo = "exit $exitcode";
-  $Jobstep->{'arvados_task'}->reload;
-  my $success = $Jobstep->{'arvados_task'}->{success};
-
-  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
-
-  if (!defined $success) {
-    # task did not indicate one way or the other --> fail
-    $Jobstep->{'arvados_task'}->{success} = 0;
-    $Jobstep->{'arvados_task'}->save;
-    $success = 0;
-  }
-
-  if (!$success)
-  {
-    my $no_incr_attempts;
-    $no_incr_attempts = 1 if $Jobstep->{node_fail};
-
-    ++$thisround_failed;
-    ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
-
-    # Check for signs of a failed or misconfigured node
-    if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
-       2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
-      # Don't count this against jobstep failure thresholds if this
-      # node is already suspected faulty and srun exited quickly
-      if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5 &&
-         $Jobstep->{attempts} > 1) {
-       Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
-        $no_incr_attempts = 1;
-       --$Jobstep->{attempts};
-      }
-      ban_node_by_slot($proc{$pid}->{slot});
-    }
-
-    push @jobstep_todo, $jobstepid;
-    Log ($jobstepid, "failure in $elapsed seconds");
-
-    --$Jobstep->{attempts} if $no_incr_attempts;
-    $Job->{'tasks_summary'}->{'failed'}++;
-  }
-  else
-  {
-    ++$thisround_succeeded;
-    $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
-    $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
-    push @jobstep_done, $jobstepid;
-    Log ($jobstepid, "success in $elapsed seconds");
-  }
-  $Jobstep->{exitcode} = $exitcode;
-  $Jobstep->{finishtime} = time;
-  process_stderr ($jobstepid, $success);
-  Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
-
-  close $reader{$jobstepid};
-  delete $reader{$jobstepid};
-  delete $slot[$proc{$pid}->{slot}]->{pid};
-  push @freeslot, $proc{$pid}->{slot};
-  delete $proc{$pid};
-
-  # Load new tasks
-  my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
-    'where' => {
-      'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-    },
-    'order' => 'qsequence'
-  );
-  foreach my $arvados_task (@{$newtask_list->{'items'}}) {
-    my $jobstep = {
-      'level' => $arvados_task->{'sequence'},
-      'attempts' => 0,
-      'arvados_task' => $arvados_task
-    };
-    push @jobstep, $jobstep;
-    push @jobstep_todo, $#jobstep;
-  }
-
-  $progress_is_dirty = 1;
-  1;
-}
-
-
-sub check_squeue
-{
-  # return if the kill list was checked <4 seconds ago
-  if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
-  {
-    return;
-  }
-  $squeue_kill_checked = time;
-
-  # use killem() on procs whose killtime is reached
-  for (keys %proc)
-  {
-    if (exists $proc{$_}->{killtime}
-       && $proc{$_}->{killtime} <= time)
-    {
-      killem ($_);
-    }
-  }
-
-  # return if the squeue was checked <60 seconds ago
-  if (defined $squeue_checked && $squeue_checked > time - 60)
-  {
-    return;
-  }
-  $squeue_checked = time;
-
-  if (!$have_slurm)
-  {
-    # here is an opportunity to check for mysterious problems with local procs
-    return;
-  }
-
-  # get a list of steps still running
-  my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
-  chop @squeue;
-  if ($squeue[-1] ne "ok")
-  {
-    return;
-  }
-  pop @squeue;
-
-  # which of my jobsteps are running, according to squeue?
-  my %ok;
-  foreach (@squeue)
-  {
-    if (/^(\d+)\.(\d+) (\S+)/)
-    {
-      if ($1 eq $ENV{SLURM_JOBID})
-      {
-       $ok{$3} = 1;
-      }
-    }
-  }
-
-  # which of my active child procs (>60s old) were not mentioned by squeue?
-  foreach (keys %proc)
-  {
-    if ($proc{$_}->{time} < time - 60
-       && !exists $ok{$proc{$_}->{jobstepname}}
-       && !exists $proc{$_}->{killtime})
-    {
-      # kill this proc if it hasn't exited in 30 seconds
-      $proc{$_}->{killtime} = time + 30;
-    }
-  }
-}
-
-
-sub release_allocation
-{
-  if ($have_slurm)
-  {
-    Log (undef, "release job allocation");
-    system "scancel $ENV{SLURM_JOBID}";
-  }
-}
-
-
-sub readfrompipes
-{
-  my $gotsome = 0;
-  foreach my $job (keys %reader)
-  {
-    my $buf;
-    while (0 < sysread ($reader{$job}, $buf, 8192))
-    {
-      print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      $jobstep[$job]->{stderr} .= $buf;
-      preprocess_stderr ($job);
-      if (length ($jobstep[$job]->{stderr}) > 16384)
-      {
-       substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
-      }
-      $gotsome = 1;
-    }
-  }
-  return $gotsome;
-}
-
-
-sub preprocess_stderr
-{
-  my $job = shift;
-
-  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
-    my $line = $1;
-    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
-    Log ($job, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
-      # whoa.
-      $main::please_freeze = 1;
-    }
-    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
-      $jobstep[$job]->{node_fail} = 1;
-      ban_node_by_slot($jobstep[$job]->{slotindex});
-    }
-  }
-}
-
-
-sub process_stderr
-{
-  my $job = shift;
-  my $success = shift;
-  preprocess_stderr ($job);
-
-  map {
-    Log ($job, "stderr $_");
-  } split ("\n", $jobstep[$job]->{stderr});
-}
-
-
-sub collate_output
-{
-  my $whc = Warehouse->new;
-  Log (undef, "collate");
-  $whc->write_start (1);
-  my $joboutput;
-  for (@jobstep)
-  {
-    next if (!exists $_->{'arvados_task'}->{output} ||
-             !$_->{'arvados_task'}->{'success'} ||
-             $_->{'exitcode'} != 0);
-    my $output = $_->{'arvados_task'}->{output};
-    if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
-    {
-      $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($output);
-    }
-    elsif (@jobstep == 1)
-    {
-      $joboutput = $output;
-      $whc->write_finish;
-    }
-    elsif (defined (my $outblock = $whc->fetch_block ($output)))
-    {
-      $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($outblock);
-    }
-    else
-    {
-      my $errstr = $whc->errstr;
-      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
-      $success = 0;
-    }
-  }
-  $joboutput = $whc->write_finish if !defined $joboutput;
-  if ($joboutput)
-  {
-    Log (undef, "output $joboutput");
-    $Job->{'output'} = $joboutput;
-    $Job->save if $job_has_uuid;
-  }
-  else
-  {
-    Log (undef, "output undef");
-  }
-  return $joboutput;
-}
-
-
-sub killem
-{
-  foreach (@_)
-  {
-    my $sig = 2;               # SIGINT first
-    if (exists $proc{$_}->{"sent_$sig"} &&
-       time - $proc{$_}->{"sent_$sig"} > 4)
-    {
-      $sig = 15;               # SIGTERM if SIGINT doesn't work
-    }
-    if (exists $proc{$_}->{"sent_$sig"} &&
-       time - $proc{$_}->{"sent_$sig"} > 4)
-    {
-      $sig = 9;                        # SIGKILL if SIGTERM doesn't work
-    }
-    if (!exists $proc{$_}->{"sent_$sig"})
-    {
-      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
-      kill $sig, $_;
-      select (undef, undef, undef, 0.1);
-      if ($sig == 2)
-      {
-       kill $sig, $_;     # srun wants two SIGINT to really interrupt
-      }
-      $proc{$_}->{"sent_$sig"} = time;
-      $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
-    }
-  }
-}
-
-
-sub fhbits
-{
-  my($bits);
-  for (@_) {
-    vec($bits,fileno($_),1) = 1;
-  }
-  $bits;
-}
-
-
-sub Log                                # ($jobstep_id, $logmessage)
-{
-  if ($_[1] =~ /\n/) {
-    for my $line (split (/\n/, $_[1])) {
-      Log ($_[0], $line);
-    }
-    return;
-  }
-  my $fh = select STDERR; $|=1; select $fh;
-  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
-  $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
-  $message .= "\n";
-  my $datetime;
-  if ($metastream || -t STDERR) {
-    my @gmtime = gmtime;
-    $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
-                        $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
-  }
-  print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
-
-  return if !$metastream;
-  $metastream->write_data ($datetime . " " . $message);
-}
-
-
-sub croak
-{
-  my ($package, $file, $line) = caller;
-  my $message = "@_ at $file line $line\n";
-  Log (undef, $message);
-  freeze() if @jobstep_todo;
-  collate_output() if @jobstep_todo;
-  cleanup();
-  save_meta() if $metastream;
-  die;
-}
-
-
-sub cleanup
-{
-  return if !$job_has_uuid;
-  $Job->reload;
-  $Job->{'running'} = 0;
-  $Job->{'success'} = 0;
-  $Job->{'finished_at'} = gmtime;
-  $Job->save;
-}
-
-
-sub save_meta
-{
-  my $justcheckpoint = shift; # false if this will be the last meta saved
-  my $m = $metastream;
-  $m = $m->copy if $justcheckpoint;
-  $m->write_finish;
-  my $loglocator = $m->as_key;
-  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
-  Log (undef, "meta key is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->save if $job_has_uuid;
-}
-
-
-sub freeze_if_want_freeze
-{
-  if ($main::please_freeze)
-  {
-    release_allocation();
-    if (@_)
-    {
-      # kill some srun procs before freeze+stop
-      map { $proc{$_} = {} } @_;
-      while (%proc)
-      {
-       killem (keys %proc);
-       select (undef, undef, undef, 0.1);
-       my $died;
-       while (($died = waitpid (-1, WNOHANG)) > 0)
-       {
-         delete $proc{$died};
-       }
-      }
-    }
-    freeze();
-    collate_output();
-    cleanup();
-    save_meta();
-    exit 0;
-  }
-}
-
-
-sub freeze
-{
-  Log (undef, "Freeze not implemented");
-  return;
-}
-
-
-sub thaw
-{
-  croak ("Thaw not implemented");
-
-  my $whc;
-  my $key = shift;
-  Log (undef, "thaw from $key");
-
-  @jobstep = ();
-  @jobstep_done = ();
-  @jobstep_todo = ();
-  @jobstep_tomerge = ();
-  $jobstep_tomerge_level = 0;
-  my $frozenjob = {};
-
-  my $stream = new Warehouse::Stream ( whc => $whc,
-                                      hash => [split (",", $key)] );
-  $stream->rewind;
-  while (my $dataref = $stream->read_until (undef, "\n\n"))
-  {
-    if ($$dataref =~ /^job /)
-    {
-      foreach (split ("\n", $$dataref))
-      {
-       my ($k, $v) = split ("=", $_, 2);
-       $frozenjob->{$k} = freezeunquote ($v);
-      }
-      next;
-    }
-
-    if ($$dataref =~ /^merge (\d+) (.*)/)
-    {
-      $jobstep_tomerge_level = $1;
-      @jobstep_tomerge
-         = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
-      next;
-    }
-
-    my $Jobstep = { };
-    foreach (split ("\n", $$dataref))
-    {
-      my ($k, $v) = split ("=", $_, 2);
-      $Jobstep->{$k} = freezeunquote ($v) if $k;
-    }
-    $Jobstep->{attempts} = 0;
-    push @jobstep, $Jobstep;
-
-    if ($Jobstep->{exitcode} eq "0")
-    {
-      push @jobstep_done, $#jobstep;
-    }
-    else
-    {
-      push @jobstep_todo, $#jobstep;
-    }
-  }
-
-  foreach (qw (script script_version script_parameters))
-  {
-    $Job->{$_} = $frozenjob->{$_};
-  }
-  $Job->save if $job_has_uuid;
-}
-
-
-sub freezequote
-{
-  my $s = shift;
-  $s =~ s/\\/\\\\/g;
-  $s =~ s/\n/\\n/g;
-  return $s;
-}
-
-
-sub freezeunquote
-{
-  my $s = shift;
-  $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
-  return $s;
-}
-
-
-sub srun
-{
-  my $srunargs = shift;
-  my $execargs = shift;
-  my $opts = shift || {};
-  my $stdin = shift;
-  my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
-  print STDERR (join (" ",
-                     map { / / ? "'$_'" : $_ }
-                     (@$args)),
-               "\n")
-      if $ENV{CRUNCH_DEBUG};
-
-  if (defined $stdin) {
-    my $child = open STDIN, "-|";
-    defined $child or die "no fork: $!";
-    if ($child == 0) {
-      print $stdin or die $!;
-      close STDOUT or die $!;
-      exit 0;
-    }
-  }
-
-  return system (@$args) if $opts->{fork};
-
-  exec @$args;
-  warn "ENV size is ".length(join(" ",%ENV));
-  die "exec failed: $!: @$args";
-}
-
-
-sub ban_node_by_slot {
-  # Don't start any new jobsteps on this node for 60 seconds
-  my $slotid = shift;
-  $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
-  $slot[$slotid]->{node}->{hold_count}++;
-  Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
-}
-
-__DATA__
-#!/usr/bin/perl
-
-# checkout-and-build
-
-use Fcntl ':flock';
-
-my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
-my $repo = $ENV{"CRUNCH_SRC_URL"};
-
-open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
-flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit) {
-    exit 0;
-}
-
-unlink "$destdir.commit";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
-
-mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX <DATA>;
-if(!close(TARX)) {
-  die "'tar -C $destdir -xf -' exited $?: $!";
-}
-
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
-mkdir $install_dir;
-if (-e "$destdir/crunch_scripts/install") {
-    shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
-} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
-    # Old version
-    shell_or_die ("./tests/autotests.sh", $install_dir);
-} elsif (-e "./install.sh") {
-    shell_or_die ("./install.sh", $install_dir);
-}
-
-if ($commit) {
-    unlink "$destdir.commit.new";
-    symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
-    rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
-}
-
-close L;
-
-exit 0;
-
-sub shell_or_die
-{
-  if ($ENV{"DEBUG"}) {
-    print STDERR "@_\n";
-  }
-  system (@_) == 0
-      or die "@_ failed: $! exit 0x".sprintf("%x",$?);
-}
-
-__DATA__
new file mode 120000 (symlink)
index 0000000000000000000000000000000000000000..ff0e7022bfba75805c3d02a03e5a47c4ce7699d0
--- /dev/null
@@ -0,0 +1 @@
+../../sdk/cli/bin/arv-crunch-job
\ No newline at end of file