copy whjobmanager from warehouse-apps
authorTom Clegg <tom@clinicalfuture.com>
Tue, 14 May 2013 19:52:39 +0000 (12:52 -0700)
committerTom Clegg <tom@clinicalfuture.com>
Tue, 14 May 2013 19:52:39 +0000 (12:52 -0700)
services/crunch/crunch-job [new file with mode: 0755]

diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job
new file mode 100755 (executable)
index 0000000..e884a44
--- /dev/null
@@ -0,0 +1,1551 @@
+#!/usr/bin/perl
+# -*- mode: perl; perl-indent-level: 2; -*-
+
+=head1 NAME
+
+whjobmanager: Execute job steps, save snapshots as requested, collate output.
+
+=head1 SYNOPSIS
+
+Obtain job details from database, run tasks on compute nodes
+(typically invoked by scheduler on cloud controller):
+
+ whjobmanager jobid
+
+Obtain job details from command line, run tasks on local machine
+(typically invoked by application or developer on VM):
+
+ whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \
+              [stepspernode=N] [SOMEKNOB=value] ...
+
+=head1 RUNNING JOBS LOCALLY
+
+whjobmanager(1p)'s log messages appear on stderr, and are saved in the
+warehouse at each checkpoint and when the job finishes.
+
+If the job succeeds, the job's output locator is printed on stdout.
+
+If a job step outputs anything to stderr, it appears in
+whjobmanager(1p)'s log when the step finishes.
+
+While the job is running, the following signals are accepted:
+
+=over
+
+=item control-C, SIGINT, SIGQUIT
+
+Save a checkpoint, terminate any job steps that are running, and stop.
+
+=item SIGALRM
+
+Save a checkpoint and continue.
+
+=back
+
+=head1 SEE ALSO
+
+whintro(1p), wh(1p)
+
+=cut
+
+
+use strict;
+use DBI;
+use POSIX ':sys_wait_h';
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Warehouse;
+use Warehouse::Stream;
+
+$ENV{"TMPDIR"} ||= "/tmp";
+
+do '/etc/warehouse/warehouse-server.conf';
+
+my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
+my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/;
+
+
+$SIG{'USR1'} = sub
+{
+  $main::ENV{MR_DEBUG} = 1;
+};
+$SIG{'USR2'} = sub
+{
+  $main::ENV{MR_DEBUG} = 0;
+};
+
+
+
+my $whc = new Warehouse or croak ("failed to create Warehouse client");
+my $metastream = new Warehouse::Stream (whc => $whc);
+$metastream->clear;
+$metastream->name (".");
+$metastream->write_start ("log.txt");
+
+
+
+my $Job = {};
+my $job_id;
+my $dbh;
+my $sth;
+if ($have_database)
+{
+  ($job_id) = @ARGV;
+
+  $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
+  croak ($DBI::errstr) if !$dbh;
+  $dbh->{InactiveDestroy} = 1;
+
+  $sth = $dbh->prepare ("select * from mrjob where id=?");
+  $sth->execute ($job_id) or croak ($dbh->errstr);
+  $Job = $sth->fetchrow_hashref or croak ($sth->errstr);
+}
+
+else
+{
+  my %knob;
+  foreach (@ARGV)
+  {
+    if (/([a-z].*?)=(.*)/) {
+      $Job->{$1} = $2;
+    } elsif (/(.*?)=(.*)/) {
+      $knob{$1} = $2;
+    }
+  }
+  $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob);
+
+  if (!$Job->{thawedfromkey})
+  {
+    map { croak ("No $_ specified") unless $Job->{$_} }
+    qw(mrfunction revision inputkey);
+  }
+
+  if (!defined $Job->{id}) {
+    chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`));
+  }
+  $job_id = $Job->{id};
+}
+
+
+
+$Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey};
+delete $Job->{input0};
+
+
+
+my $max_ncpus;
+map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs});
+$max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/;
+$max_ncpus = $$Job{stepspernode} if $$Job{stepspernode};
+my $maxstepspernode;
+
+
+
+Log (undef, "check slurm allocation");
+my @slot;
+my @node;
+# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
+my @sinfo;
+if (!$have_slurm)
+{
+  my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
+  push @sinfo, "$localcpus localhost";
+}
+if (exists $ENV{SLURM_NODELIST})
+{
+  push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
+}
+foreach (@sinfo)
+{
+  my ($ncpus, $slurm_nodelist) = split;
+  $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0;
+  $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus;
+
+  my @nodelist;
+  while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
+  {
+    my $nodelist = $1;
+    if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
+    {
+      my $ranges = $1;
+      foreach (split (",", $ranges))
+      {
+       my ($a, $b);
+       if (/(\d+)-(\d+)/)
+       {
+         $a = $1;
+         $b = $2;
+       }
+       else
+       {
+         $a = $_;
+         $b = $_;
+       }
+       push @nodelist, map {
+         my $n = $nodelist;
+         $n =~ s/\[[-,\d]+\]/$_/;
+         $n;
+       } ($a..$b);
+      }
+    }
+    else
+    {
+      push @nodelist, $nodelist;
+    }
+  }
+  foreach my $nodename (@nodelist)
+  {
+    Log (undef, "node $nodename - $ncpus slots");
+    my $node = { name => $nodename,
+                ncpus => $ncpus,
+                losing_streak => 0,
+                hold_until => 0 };
+    foreach my $cpu (1..$ncpus)
+    {
+      push @slot, { node => $node,
+                   cpu => $cpu };
+    }
+  }
+  push @node, @nodelist;
+}
+
+
+
+# Ensure that we get one jobstep running on each allocated node before
+# we start overloading nodes with concurrent steps
+
+@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
+
+
+
+my $jobmanager_id;
+if ($have_database)
+{
+  # Claim this job, and make sure nobody else does
+
+  $sth = $dbh->prepare ("insert into mrjobmanager
+                        (pid, revision, starttime)
+                        values (?, ?, now())");
+  my $rev = q/$Revision$/;
+  $rev =~ /\d+/;
+  $sth->execute ($$, +$&) or croak ($dbh->errstr);
+
+  $sth = $dbh->prepare ("select last_insert_id()");
+  $sth->execute or croak ($dbh->errstr);
+  ($jobmanager_id) = $sth->fetchrow_array;
+
+  $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now()
+                        where id=? and jobmanager_id is null");
+  $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr);
+
+  $sth = $dbh->prepare ("select jobmanager_id from mrjob
+                        where id=?");
+  $sth->execute ($job_id) or croak ($dbh->errstr);
+  my ($check_jobmanager_id) = $sth->fetchrow_array;
+  if ($check_jobmanager_id != $jobmanager_id)
+  {
+    # race condition - another job manager proc stole the job
+    Log (undef,
+        "job taken by jobmanager id $check_jobmanager_id");
+    exit (1);
+  }
+}
+
+
+Log (undef, "start");
+$SIG{'INT'} = sub { $main::please_freeze = 1; };
+$SIG{'QUIT'} = sub { $main::please_freeze = 1; };
+$SIG{'TERM'} = \&croak;
+$SIG{'TSTP'} = sub { $main::please_freeze = 1; };
+$SIG{'ALRM'} = sub { $main::please_info = 1; };
+$SIG{'CONT'} = sub { $main::please_continue = 1; };
+$main::please_freeze = 0;
+$main::please_info = 0;
+$main::please_continue = 0;
+my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
+
+grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
+$ENV{"MR_JOB_ID"} = $job_id;
+$ENV{"JOB_UUID"} = $job_id;
+
+
+my @jobstep;
+my @jobstep_todo = ();
+my @jobstep_done = ();
+my @jobstep_tomerge = ();
+my $jobstep_tomerge_level = 0;
+my $squeue_checked;
+my $squeue_kill_checked;
+my $output_in_keep = 0;
+
+
+
+if (defined $Job->{thawedfromkey})
+{
+  thaw ($Job->{thawedfromkey});
+}
+else
+{
+  push @jobstep, { input => $Job->{inputkey},
+                  level => 0,
+                  attempts => 0,
+                };
+  push @jobstep_todo, 0;
+}
+
+
+
+mkdir ($ENV{"TMPDIR"}."/mrcompute");
+if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) {
+  # set up a fresh gnupg directory just for this process
+  # TODO: reap abandoned gnupg dirs
+  system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$");
+  mkdir ($ENV{"TMPDIR"}."/mrcompute");
+  mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700);
+  mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!");
+
+  my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$";
+
+  open C, ">", $newhomedir."/gpg.conf";
+  print C "always-trust\n";
+  close C;
+
+  # import secret keys referenced in job spec
+  my $hashes = $1;
+  $hashes =~ s/\'/\'\\\'\'/g;
+  my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`;
+  my %encrypt_to;
+  while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) {
+    my $keynum = $1;
+    while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) {
+      $encrypt_to{$1} = 1;
+    }
+  }
+  if (!%encrypt_to) {
+    croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out");
+  }
+
+  if ($have_database) {
+
+    # make sure the job request was signed by all of the secret keys
+    # contained in GPG_KEYS (otherwise, any VM can just copy the
+    # GPG_KEYS hash from an existing mr-job and submit new jobs that can
+    # read private data)
+
+    my %did_not_sign;
+    my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`;
+    while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) {
+      $did_not_sign{$1} = 1;
+    }
+    my $srfile = "$newhomedir/signedrequest";
+    open SREQ, ">", $srfile;
+    print SREQ $$Job{"signedrequest"};
+    close SREQ;
+    my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`;
+    unlink $srfile;
+    if ($gpg_v =~ /\nok\n$/s) {
+      while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) {
+       delete $did_not_sign{$1};
+      }
+    }
+    if (%did_not_sign) {
+      croak (join ("\n",
+                  "Some secret keys provided did not sign this job request:",
+                  keys %did_not_sign) . "\n");
+    }
+  }
+
+  my $hostname = `hostname`;
+  chomp ($hostname);
+
+  # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need
+  $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`;
+  $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`;
+
+  # import all secret keys from my real home dir
+  `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`;
+
+  # use the new gnupg dir from now on
+  $ENV{"GNUPGHOME"} = $newhomedir;
+
+  # if I have a secret key for root@{host} or {user}@{host} or
+  # {configured-controller-gpg-uid}, add that as a recipient too so
+  # I'll be able to read frozentokeys etc. later
+  my %allkeys;
+  while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) {
+    $allkeys{$1} = 1;
+  }
+  my $encrypting_to_self = 0;
+  my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname);
+  push @try_these_uids, ($whc->{config}->{controller_gpg_uid})
+      if exists $whc->{config}->{controller_gpg_uid};
+  foreach my $id (@try_these_uids) {
+    if (exists $allkeys{$id}) {
+      $encrypt_to{$id} = 1;
+      $encrypting_to_self = 1;
+      last;
+    }
+  }
+
+  if (!$encrypting_to_self) {
+    croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read");
+  }
+
+  # tell the client library (and child procs and jobsteps) to encrypt using these keys
+  $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to);
+  Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')");
+  $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"});
+}
+
+
+
+$ENV{"MR_REVISION"} = $Job->{revision};
+
+my $git_build_script;
+my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
+if ($skip_install)
+{
+  $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision};
+}
+else
+{
+  Log (undef, "Install revision ".$Job->{revision});
+  my $nodelist = join(",", @node);
+
+  # Clean out mrcompute/work and mrcompute/opt
+
+  my $cleanpid = fork();
+  if ($cleanpid == 0)
+  {
+    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}],
+         ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']);
+    exit (1);
+  }
+  while (1)
+  {
+    last if $cleanpid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($cleanpid);
+    select (undef, undef, undef, 0.1);
+  }
+  Log (undef, "Clean-work-dir exited $?");
+
+  # Install requested code version
+
+  my $build_script;
+  my @execargs;
+  my @srunargs = ("srun",
+                 "--nodelist=$nodelist",
+                 "-D", $ENV{TMPDIR}, "--job-name=$job_id");
+
+  $ENV{"MR_REVISION"} = $Job->{revision};
+  $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps";
+  $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt";
+
+  my $commit;
+  my $treeish = $Job->{revision};
+  my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url");
+
+  # Create/update our clone of the remote git repo
+
+  if (!-d $ENV{MR_REVISION_SRCDIR}) {
+    system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0
+       or croak ("git clone $repo failed: exit ".($?>>8));
+    system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false");
+  }
+  `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`;
+
+  # If this looks like a subversion r#, look for it in git-svn commit messages
+
+  if ($treeish =~ m{^\d{1,4}$}) {
+    my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
+    chomp $gitlog;
+    if ($gitlog =~ /^[a-f0-9]{40}$/) {
+      $commit = $gitlog;
+      Log (undef, "Using commit $commit for revision $treeish");
+    }
+  }
+
+  # If that didn't work, try asking git to look it up as a tree-ish.
+
+  if (!defined $commit) {
+
+    my $cooked_treeish = $treeish;
+    if ($treeish !~ m{^[0-9a-f]{5,}$}) {
+      # Looks like a git branch name -- make sure git knows it's
+      # relative to the remote repo
+      $cooked_treeish = "origin/$treeish";
+    }
+
+    my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`;
+    chomp $found;
+    if ($found =~ /^[0-9a-f]{40}$/s) {
+      $commit = $found;
+      if ($commit ne $treeish) {
+       # Make sure we record the real commit id in the database,
+       # frozentokey, logs, etc. -- instead of an abbreviation or a
+       # branch name which can become ambiguous or point to a
+       # different commit in the future.
+       $ENV{"MR_REVISION"} = $commit;
+       $Job->{revision} = $commit;
+       dbh_do
+           ("update mrjob set revision=? where id=?",
+            undef,
+            $Job->{revision}, $Job->{id});
+       Log (undef, "Using commit $commit for tree-ish $treeish");
+      }
+    }
+  }
+
+  if (defined $commit) {
+    $ENV{"MR_GIT_COMMIT"} = $commit;
+    $ENV{"MR_GIT_CLONE_URL"} = $repo;
+    @execargs = ("sh", "-c",
+                "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo");
+    open GBS, "<", `echo -n \$(which whjob-checkout-and-build)`
+       or croak ("can't find whjob-checkout-and-build");
+    local $/ = undef;
+    $git_build_script = <GBS>;
+    close GBS;
+    $build_script = $git_build_script;
+  }
+  elsif ($treeish =~ m{^(\d{1,5})$}) {
+    # Want a subversion r# but couldn't find it in git-svn history -
+    # might as well try using the subversion repo in case it's still
+    # there.
+    $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root");
+    $ENV{"INSTALL_REVISION"} = $Job->{revision};
+    $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish";
+    $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src";
+    @execargs = ("sh", "-c",
+                "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )");
+  }
+  else {
+    croak ("could not figure out commit id for $treeish");
+  }
+
+  my $installpid = fork();
+  if ($installpid == 0)
+  {
+    srun (\@srunargs, \@execargs, {}, $build_script);
+    exit (1);
+  }
+  while (1)
+  {
+    last if $installpid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($installpid);
+    select (undef, undef, undef, 0.1);
+  }
+  Log (undef, "Install exited $?");
+}
+
+
+
+foreach (qw (mrfunction revision nodes stepspernode inputkey))
+{
+  Log (undef, $_ . " " . $Job->{$_});
+}
+foreach (split (/\n/, $Job->{knobs}))
+{
+  Log (undef, "knob " . $_);
+}
+
+
+
+my $success;
+
+
+
+ONELEVEL:
+
+my $thisround_succeeded = 0;
+my $thisround_failed = 0;
+my $thisround_failed_multiple = 0;
+
+@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
+                      or $a <=> $b } @jobstep_todo;
+my $level = $jobstep[$jobstep_todo[0]]->{level};
+Log (undef, "start level $level");
+
+
+
+my %proc;
+my @freeslot = (0..$#slot);
+my @holdslot;
+my %reader;
+my ($id, $input, $attempts);
+my $progress_is_dirty = 1;
+my $progress_stats_updated = 0;
+
+update_progress_stats();
+
+
+
+THISROUND:
+for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
+{
+  $main::please_continue = 0;
+
+  my $id = $jobstep_todo[$todo_ptr];
+  my $Jobstep = $jobstep[$id];
+  if ($Jobstep->{level} != $level)
+  {
+    next;
+  }
+  if ($Jobstep->{attempts} > 9)
+  {
+    Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
+    $success = 0;
+    last THISROUND;
+  }
+
+  pipe $reader{$id}, "writer" or croak ($!);
+  my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
+  fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+
+  my $childslot = $freeslot[0];
+  my $childnode = $slot[$childslot]->{node};
+  my $childslotname = join (".",
+                           $slot[$childslot]->{node}->{name},
+                           $slot[$childslot]->{cpu});
+  my $childpid = fork();
+  if ($childpid == 0)
+  {
+    $SIG{'INT'} = 'DEFAULT';
+    $SIG{'QUIT'} = 'DEFAULT';
+    $SIG{'TERM'} = 'DEFAULT';
+
+    foreach (values (%reader))
+    {
+      close($_);
+    }
+    fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
+    open(STDOUT,">&writer");
+    open(STDERR,">&writer");
+
+    undef $dbh;
+    undef $sth;
+
+
+    delete $ENV{"GNUPGHOME"};
+    $ENV{"MR_ID"} = $id;
+    $ENV{"MR_INPUT"} = $Jobstep->{input};
+    $ENV{"MR_KNOBS"} = $Job->{knobs};
+    $ENV{"MR_LEVEL"} = $level;
+    $ENV{"MR_FUNCTION"} = $Job->{mrfunction};
+    $ENV{"MR_INPUT0"} = $Job->{inputkey};
+    $ENV{"MR_INPUTKEY"} = $Job->{inputkey};
+    $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
+    $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
+    $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated
+    $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work";
+    $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu};
+    $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers);
+    $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain;
+    $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class;
+
+    $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id;
+    $ENV{"TASK_QSEQUENCE"} = $id;
+    $ENV{"TASK_SEQUENCE"} = $Jobstep->{level};
+
+    $ENV{"GZIP"} = "-n";
+
+    my @srunargs = (
+      "srun",
+      "--nodelist=".$childnode->{name},
+      qw(-n1 -c1 -N1 -D), $ENV{TMPDIR},
+      "--job-name=$job_id.$id.$$",
+       );
+    my @execargs = qw(sh);
+    my $script = "";
+    my $command =
+       "mkdir -p $ENV{TMPDIR}/mrcompute/revision "
+       ."&& cd $ENV{TMPDIR}/mrcompute ";
+    if ($git_build_script)
+    {
+      $script = $git_build_script;
+      $command .=
+         "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}";
+    }
+    elsif (!$skip_install)
+    {
+      $command .=
+         "&& "
+         ."( "
+         ."  [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] "
+         ."|| "
+         ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
+         ."    && ./installrevision "
+         ."  ) "
+         .") ";
+    }
+    if (exists $ENV{GPG_KEYS}) {
+      $command .=
+         "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
+    } else {
+      $command .=
+         "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
+    }
+    my @execargs = ('bash', '-c', $command);
+    srun (\@srunargs, \@execargs, undef, $script);
+    exit (1);
+  }
+  close("writer");
+  if (!defined $childpid)
+  {
+    close $reader{$id};
+    delete $reader{$id};
+    next;
+  }
+  shift @freeslot;
+  $proc{$childpid} = { jobstep => $id,
+                      time => time,
+                      slot => $childslot,
+                      jobstepname => "$job_id.$id.$childpid",
+                    };
+  croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
+  $slot[$childslot]->{pid} = $childpid;
+
+  Log ($id, "child $childpid started on $childslotname");
+  $Jobstep->{attempts} ++;
+  $Jobstep->{starttime} = time;
+  $Jobstep->{node} = $childnode->{name};
+  $Jobstep->{slotindex} = $childslot;
+  delete $Jobstep->{stderr};
+  delete $Jobstep->{output};
+  delete $Jobstep->{finishtime};
+
+  splice @jobstep_todo, $todo_ptr, 1;
+  --$todo_ptr;
+
+  $progress_is_dirty = 1;
+
+  while (!@freeslot
+        ||
+        (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+  {
+    last THISROUND if $main::please_freeze;
+    if ($main::please_info)
+    {
+      $main::please_info = 0;
+      freeze();
+      collate_output();
+      save_meta(1);
+      update_progress_stats();
+    }
+    my $gotsome
+       = readfrompipes ()
+       + reapchildren ();
+    if (!$gotsome)
+    {
+      check_squeue();
+      update_progress_stats();
+      select (undef, undef, undef, 0.1);
+    }
+    elsif (time - $progress_stats_updated >= 30)
+    {
+      update_progress_stats();
+    }
+    if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
+       ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
+    {
+      my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
+         .($thisround_failed+$thisround_succeeded)
+         .") -- giving up on this round";
+      Log (undef, $message);
+      last THISROUND;
+    }
+
+    # move slots from freeslot to holdslot (or back to freeslot) if necessary
+    for (my $i=$#freeslot; $i>=0; $i--) {
+      if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
+       push @holdslot, (splice @freeslot, $i, 1);
+      }
+    }
+    for (my $i=$#holdslot; $i>=0; $i--) {
+      if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
+       push @freeslot, (splice @holdslot, $i, 1);
+      }
+    }
+
+    # give up if no nodes are succeeding
+    if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
+      my $message = "Every node has failed -- giving up on this round";
+      Log (undef, $message);
+      last THISROUND;
+    }
+  }
+}
+
+
+push @freeslot, splice @holdslot;
+map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
+
+
+Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
+while (%proc)
+{
+  goto THISROUND if $main::please_continue;
+  $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+  readfrompipes ();
+  if (!reapchildren())
+  {
+    check_squeue();
+    update_progress_stats();
+    select (undef, undef, undef, 0.1);
+    killem (keys %proc) if $main::please_freeze;
+  }
+}
+
+update_progress_stats();
+freeze_if_want_freeze();
+
+
+if (@jobstep_tomerge && !@jobstep_todo)
+{
+  push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0),
+                  level => $jobstep_tomerge_level,
+                  attempts => 0 };
+  push @jobstep_todo, $#jobstep;
+}
+
+
+if (!defined $success)
+{
+  if (@jobstep_todo &&
+      $thisround_succeeded == 0 &&
+      ($thisround_failed == 0 || $thisround_failed > 4))
+  {
+    my $message = "stop because $thisround_failed tasks failed and none succeeded";
+    Log (undef, $message);
+    $success = 0;
+  }
+  if (!@jobstep_todo)
+  {
+    $success = 1;
+  }
+}
+
+goto ONELEVEL if !defined $success;
+
+
+release_allocation();
+freeze();
+my $key = &collate_output();
+$success = 0 if !$key;
+
+
+if ($key)
+{
+  my @keepkey;
+  foreach my $hash (split ",", $key)
+  {
+    my $keephash = $whc->store_in_keep (hash => $hash,
+                                       nnodes => 3);
+    if (!$keephash)
+    {
+      Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr);
+      $keephash = $hash;
+    }
+    push @keepkey, $keephash;
+  }
+  my $keepkey = join (",", @keepkey);
+  Log (undef, "outputkey+K $keepkey");
+  print "$keepkey\n" if $success;
+
+  if ($output_in_keep)
+  {
+    $key = $keepkey;
+  }
+
+  dbh_do ("update mrjob set output=? where id=?", undef,
+         $key, $job_id)
+      or croak ($dbh->errstr);
+
+  $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id")
+      or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr);
+}
+
+
+Log (undef, "finish");
+
+dbh_do ("update mrjob set finishtime=now(), success=?
+         where id=? and jobmanager_id=?", undef,
+       $success, $job_id, $jobmanager_id)
+    or croak ($dbh->errstr);
+
+save_meta();
+exit 0;
+
+
+
+sub update_progress_stats
+{
+  $progress_stats_updated = time;
+  return if !$progress_is_dirty;
+  my ($todo, $done, $running) = (scalar @jobstep_todo,
+                                scalar @jobstep_done,
+                                scalar @slot - scalar @freeslot - scalar @holdslot);
+  dbh_do
+      ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?",
+       undef,
+       $todo, $done, $running, $job_id);
+  Log (undef, "status: $done done, $running running, $todo todo");
+  $progress_is_dirty = 0;
+}
+
+
+
+sub reapchildren
+{
+  my $pid = waitpid (-1, WNOHANG);
+  return 0 if $pid <= 0;
+
+  my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+                 . "."
+                 . $slot[$proc{$pid}->{slot}]->{cpu});
+  my $jobstepid = $proc{$pid}->{jobstep};
+  my $elapsed = time - $proc{$pid}->{time};
+  my $Jobstep = $jobstep[$jobstepid];
+
+  process_stderr_for_output_key ($jobstepid);
+
+  my $exitcode = $?;
+  my $exitinfo = "exit $exitcode";
+  if (!exists $Jobstep->{output})
+  {
+    $exitinfo .= " with no output key";
+    $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys;
+  }
+
+  if ($exitcode == 0 && $Jobstep->{node_fail}) {
+    $exitinfo .= " but recording as failure";
+    $exitcode = -1;
+  }
+
+  Log ($jobstepid, "child $pid on $whatslot $exitinfo");
+
+  if ($exitcode != 0 || $Jobstep->{node_fail})
+  {
+    --$Jobstep->{attempts} if $Jobstep->{node_fail};
+    ++$thisround_failed;
+    ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
+
+    # Check for signs of a failed or misconfigured node
+    if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+       2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+      # Don't count this against jobstep failure thresholds if this
+      # node is already suspected faulty and srun exited quickly
+      if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+         $elapsed < 5 &&
+         $Jobstep->{attempts} > 1) {
+       Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
+       --$Jobstep->{attempts};
+      }
+      ban_node_by_slot($proc{$pid}->{slot});
+    }
+
+    push @jobstep_todo, $jobstepid;
+    Log ($jobstepid, "failure in $elapsed seconds");
+  }
+  else
+  {
+    ++$thisround_succeeded;
+    $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+    $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+    push @jobstep_done, $jobstepid;
+    Log ($jobstepid, "success in $elapsed seconds");
+  }
+  $Jobstep->{exitcode} = $exitcode;
+  $Jobstep->{finishtime} = time;
+  process_stderr ($jobstepid, $exitcode == 0);
+  Log ($jobstepid, "output $$Jobstep{output}");
+
+  close $reader{$jobstepid};
+  delete $reader{$jobstepid};
+  delete $slot[$proc{$pid}->{slot}]->{pid};
+  push @freeslot, $proc{$pid}->{slot};
+  delete $proc{$pid};
+
+  $progress_is_dirty = 1;
+  1;
+}
+
+
+sub check_squeue
+{
+  # return if the kill list was checked <4 seconds ago
+  if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
+  {
+    return;
+  }
+  $squeue_kill_checked = time;
+
+  # use killem() on procs whose killtime is reached
+  for (keys %proc)
+  {
+    if (exists $proc{$_}->{killtime}
+       && $proc{$_}->{killtime} <= time)
+    {
+      killem ($_);
+    }
+  }
+
+  # return if the squeue was checked <60 seconds ago
+  if (defined $squeue_checked && $squeue_checked > time - 60)
+  {
+    return;
+  }
+  $squeue_checked = time;
+
+  if (!$have_slurm)
+  {
+    # here is an opportunity to check for mysterious problems with local procs
+    return;
+  }
+
+  # get a list of steps still running
+  my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
+  chop @squeue;
+  if ($squeue[-1] ne "ok")
+  {
+    return;
+  }
+  pop @squeue;
+
+  # which of my jobsteps are running, according to squeue?
+  my %ok;
+  foreach (@squeue)
+  {
+    if (/^(\d+)\.(\d+) (\S+)/)
+    {
+      if ($1 eq $ENV{SLURM_JOBID})
+      {
+       $ok{$3} = 1;
+      }
+    }
+  }
+
+  # which of my active child procs (>60s old) were not mentioned by squeue?
+  foreach (keys %proc)
+  {
+    if ($proc{$_}->{time} < time - 60
+       && !exists $ok{$proc{$_}->{jobstepname}}
+       && !exists $proc{$_}->{killtime})
+    {
+      # kill this proc if it hasn't exited in 30 seconds
+      $proc{$_}->{killtime} = time + 30;
+    }
+  }
+}
+
+
+sub release_allocation
+{
+  if ($have_slurm)
+  {
+    Log (undef, "release job allocation");
+    system "scancel $ENV{SLURM_JOBID}";
+  }
+}
+
+
+sub readfrompipes
+{
+  my $gotsome = 0;
+  foreach my $job (keys %reader)
+  {
+    my $buf;
+    while (0 < sysread ($reader{$job}, $buf, 8192))
+    {
+      print STDERR $buf if $ENV{MR_DEBUG};
+      $jobstep[$job]->{stderr} .= $buf;
+      preprocess_stderr ($job);
+      if (length ($jobstep[$job]->{stderr}) > 16384 &&
+         $jobstep[$job]->{stderr} !~ /\+\+\+mr/)
+      {
+       substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+      }
+      $gotsome = 1;
+    }
+  }
+  return $gotsome;
+}
+
+
+sub process_stderr_for_output_key
+{
+  my $job = shift;
+  while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s)
+  {
+    $jobstep[$job]->{output} = $1;
+    $jobsteps_must_output_keys = 1;
+  }
+}
+
+
+sub preprocess_stderr
+{
+  my $job = shift;
+
+  $jobstep[$job]->{stderr_jobsteps} = []
+      if !exists $jobstep[$job]->{stderr_jobsteps};
+
+  $jobstep[$job]->{stderr} =~
+      s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{
+       push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1);
+       "";
+      }gse;
+
+  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+    my $line = $1;
+    if ($line =~ /\+\+\+mr/) {
+      last;
+    }
+    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
+    Log ($job, "stderr $line");
+    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
+      # whoa.
+      $main::please_freeze = 1;
+    }
+    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+      $jobstep[$job]->{node_fail} = 1;
+      ban_node_by_slot($jobstep[$job]->{slotindex});
+    }
+  }
+}
+
+
+sub process_stderr
+{
+  my $job = shift;
+  my $success = shift;
+  preprocess_stderr ($job);
+
+  map {
+    Log ($job, "stderr $_");
+  } split ("\n", $jobstep[$job]->{stderr});
+
+  if (!$success || !exists $jobstep[$job]->{stderr_jobsteps})
+  {
+    delete $jobstep[$job]->{stderr_jobsteps};
+    return;
+  }
+
+  foreach (@{ $jobstep[$job]->{stderr_jobsteps} })
+  {
+    /^(?:\/(\d+|\*))? (\d+) (.*)/s;
+    my ($merge, $level, $input) = ($1, $2, $3);
+    my $newjobref;
+    if ($merge)
+    {
+      push @jobstep_tomerge, $input;
+      $jobstep_tomerge_level = $level;
+      if ($merge !~ /\D/ && @jobstep_tomerge >= $merge)
+      {
+       $newjobref = { input => join ("\n",
+                                     splice @jobstep_tomerge, 0, $merge),
+                      level => $level,
+                      attempts => 0 };
+      }
+    }
+    else
+    {
+      $newjobref = { input => $input,
+                    level => $level,
+                    attempts => 0 };
+    }
+    if ($newjobref)
+    {
+      push @jobstep, $newjobref;
+      push @jobstep_todo, $#jobstep;
+    }
+  }
+  delete $jobstep[$job]->{stderr_jobsteps};
+}
+
+
+sub collate_output
+{
+  Log (undef, "collate");
+  $whc->write_start (1);
+  my $key;
+  for (@jobstep)
+  {
+    next if !exists $_->{output} || $_->{exitcode} != 0;
+    my $output = $_->{output};
+    if ($output !~ /^[0-9a-f]{32}/)
+    {
+      $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
+      $whc->write_data ($output);
+    }
+    elsif (@jobstep == 1)
+    {
+      $key = $output;
+      $whc->write_finish;
+    }
+    elsif (defined (my $outblock = $whc->fetch_block ($output)))
+    {
+      $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
+      $whc->write_data ($outblock);
+    }
+    else
+    {
+      my $errstr = $whc->errstr;
+      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
+      $success = 0;
+    }
+  }
+  $key = $whc->write_finish if !defined $key;
+  if ($key)
+  {
+    Log (undef, "outputkey $key");
+    dbh_do ("update mrjob set output=? where id=?", undef,
+           $key, $job_id)
+       or Log (undef, "db update failed: ".$DBI::errstr);
+  }
+  else
+  {
+    Log (undef, "outputkey undef");
+  }
+  return $key;
+}
+
+
+sub killem
+{
+  foreach (@_)
+  {
+    my $sig = 2;               # SIGINT first
+    if (exists $proc{$_}->{"sent_$sig"} &&
+       time - $proc{$_}->{"sent_$sig"} > 4)
+    {
+      $sig = 15;               # SIGTERM if SIGINT doesn't work
+    }
+    if (exists $proc{$_}->{"sent_$sig"} &&
+       time - $proc{$_}->{"sent_$sig"} > 4)
+    {
+      $sig = 9;                        # SIGKILL if SIGTERM doesn't work
+    }
+    if (!exists $proc{$_}->{"sent_$sig"})
+    {
+      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+      kill $sig, $_;
+      select (undef, undef, undef, 0.1);
+      if ($sig == 2)
+      {
+       kill $sig, $_;     # srun wants two SIGINT to really interrupt
+      }
+      $proc{$_}->{"sent_$sig"} = time;
+      $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
+    }
+  }
+}
+
+
+sub fhbits
+{
+  my($bits);
+  for (@_) {
+    vec($bits,fileno($_),1) = 1;
+  }
+  $bits;
+}
+
+
+sub Log                                # ($jobstep_id, $logmessage)
+{
+  if ($_[1] =~ /\n/) {
+    for my $line (split (/\n/, $_[1])) {
+      Log ($_[0], $line);
+    }
+    return;
+  }
+  my $fh = select STDERR; $|=1; select $fh;
+  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+  $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
+  $message .= "\n";
+  my $datetime;
+  if ($metastream || -t STDERR) {
+    my @gmtime = gmtime;
+    $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
+                        $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
+  }
+  print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
+
+  return if !$metastream;
+  $metastream->write_data ($datetime . " " . $message);
+}
+
+
+sub reconnect_database
+{
+  return if !$have_database;
+  return if ($dbh && $dbh->do ("select now()"));
+  for (1..16)
+  {
+    $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
+    if ($dbh) {
+      $dbh->{InactiveDestroy} = 1;
+      return;
+    }
+    warn ($DBI::errstr);
+    sleep $_;
+  }
+  croak ($DBI::errstr) if !$dbh;
+}
+
+
+sub dbh_do
+{
+  return 1 if !$have_database;
+  my $ret = $dbh->do (@_);
+  return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
+  reconnect_database();
+  return $dbh->do (@_);
+}
+
+
+sub croak
+{
+  my ($package, $file, $line) = caller;
+  my $message = "@_ at $file line $line\n";
+  Log (undef, $message);
+  freeze() if @jobstep_todo;
+  collate_output() if @jobstep_todo;
+  cleanup();
+  save_meta() if $metastream;
+  die;
+}
+
+
+sub cleanup
+{
+  return if !$have_database || !$dbh;
+
+  reconnect_database();
+  my $sth;
+  $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
+  $sth->execute ($jobmanager_id);
+  $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
+  $sth->execute ($job_id, $jobmanager_id);
+}
+
+
+sub save_meta
+{
+  reconnect_database();
+  my $justcheckpoint = shift; # false if this will be the last meta saved
+  my $m = $metastream;
+  $m = $m->copy if $justcheckpoint;
+  $m->write_finish;
+  my $key = $m->as_key;
+  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
+  Log (undef, "meta key is $key");
+  dbh_do ("update mrjob set metakey=? where id=?",
+         undef,
+         $key, $job_id);
+}
+
+
+sub freeze_if_want_freeze
+{
+  if ($main::please_freeze)
+  {
+    release_allocation();
+    if (@_)
+    {
+      # kill some srun procs before freeze+stop
+      map { $proc{$_} = {} } @_;
+      while (%proc)
+      {
+       killem (keys %proc);
+       select (undef, undef, undef, 0.1);
+       my $died;
+       while (($died = waitpid (-1, WNOHANG)) > 0)
+       {
+         delete $proc{$died};
+       }
+      }
+    }
+    freeze();
+    collate_output();
+    cleanup();
+    save_meta();
+    exit 0;
+  }
+}
+
+
+sub freeze
+{
+  Log (undef, "freeze");
+
+  my $freezer = new Warehouse::Stream (whc => $whc);
+  $freezer->clear;
+  $freezer->name (".");
+  $freezer->write_start ("state.txt");
+
+  $freezer->write_data (join ("\n",
+                             "job $Job->{id}",
+                             map
+                             {
+                               $_ . "=" . freezequote($Job->{$_})
+                             } grep { $_ ne "id" } keys %$Job) . "\n\n");
+
+  foreach my $Jobstep (@jobstep)
+  {
+    my $str = join ("\n",
+                   map
+                   {
+                     $_ . "=" . freezequote ($Jobstep->{$_})
+                   } grep {
+                      $_ !~ /^stderr|slotindex|node_fail/
+                    } keys %$Jobstep);
+    $freezer->write_data ($str."\n\n");
+  }
+  if (@jobstep_tomerge)
+  {
+    $freezer->write_data
+       ("merge $jobstep_tomerge_level "
+        . freezequote (join ("\n",
+                             map { freezequote ($_) } @jobstep_tomerge))
+        . "\n\n");
+  }
+
+  $freezer->write_finish;
+  my $frozentokey = $freezer->as_key;
+  undef $freezer;
+  Log (undef, "frozento key is $frozentokey");
+  dbh_do ("update mrjob set frozentokey=? where id=?", undef,
+         $frozentokey, $job_id);
+  my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
+  Log (undef, "frozento+K key is $kfrozentokey");
+  return $frozentokey;
+}
+
+
+sub thaw
+{
+  my $key = shift;
+  Log (undef, "thaw from $key");
+
+  @jobstep = ();
+  @jobstep_done = ();
+  @jobstep_todo = ();
+  @jobstep_tomerge = ();
+  $jobstep_tomerge_level = 0;
+  my $frozenjob = {};
+
+  my $stream = new Warehouse::Stream ( whc => $whc,
+                                      hash => [split (",", $key)] );
+  $stream->rewind;
+  while (my $dataref = $stream->read_until (undef, "\n\n"))
+  {
+    if ($$dataref =~ /^job /)
+    {
+      foreach (split ("\n", $$dataref))
+      {
+       my ($k, $v) = split ("=", $_, 2);
+       $frozenjob->{$k} = freezeunquote ($v);
+      }
+      next;
+    }
+
+    if ($$dataref =~ /^merge (\d+) (.*)/)
+    {
+      $jobstep_tomerge_level = $1;
+      @jobstep_tomerge
+         = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
+      next;
+    }
+
+    my $Jobstep = { };
+    foreach (split ("\n", $$dataref))
+    {
+      my ($k, $v) = split ("=", $_, 2);
+      $Jobstep->{$k} = freezeunquote ($v) if $k;
+    }
+    $Jobstep->{attempts} = 0;
+    push @jobstep, $Jobstep;
+
+    if ($Jobstep->{exitcode} eq "0")
+    {
+      push @jobstep_done, $#jobstep;
+    }
+    else
+    {
+      push @jobstep_todo, $#jobstep;
+    }
+  }
+
+  foreach (qw (mrfunction revision inputkey knobs))
+  {
+    $Job->{$_} = $frozenjob->{$_};
+  }
+  dbh_do
+      ("update mrjob
+       set mrfunction=?, revision=?, input0=?, knobs=?
+       where id=?", undef,
+       $Job->{mrfunction},
+       $Job->{revision},
+       $Job->{inputkey},
+       $Job->{knobs},
+       $Job->{id},
+      );
+}
+
+
+sub freezequote
+{
+  my $s = shift;
+  $s =~ s/\\/\\\\/g;
+  $s =~ s/\n/\\n/g;
+  return $s;
+}
+
+
+sub freezeunquote
+{
+  my $s = shift;
+  $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
+  return $s;
+}
+
+
+sub srun
+{
+  my $srunargs = shift;
+  my $execargs = shift;
+  my $opts = shift || {};
+  my $stdin = shift;
+  my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
+  print STDERR (join (" ",
+                     map { / / ? "'$_'" : $_ }
+                     (@$args)),
+               "\n")
+      if $ENV{MR_DEBUG};
+
+  if (defined $stdin) {
+    my $child = open STDIN, "-|";
+    defined $child or die "no fork: $!";
+    if ($child == 0) {
+      print $stdin or die $!;
+      close STDOUT or die $!;
+      exit 0;
+    }
+  }
+
+  return system (@$args) if $opts->{fork};
+
+  exec @$args;
+  warn "ENV size is ".length(join(" ",%ENV));
+  die "exec failed: $!: @$args";
+}
+
+
+sub ban_node_by_slot {
+  # Don't start any new jobsteps on this node for 60 seconds
+  my $slotid = shift;
+  $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
+  Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
+}