copy whjobmanager from warehouse-apps
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 whjobmanager: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from database, run tasks on compute nodes
11 (typically invoked by scheduler on cloud controller):
12
13  whjobmanager jobid
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \
19               [stepspernode=N] [SOMEKNOB=value] ...
20
21 =head1 RUNNING JOBS LOCALLY
22
23 whjobmanager(1p)'s log messages appear on stderr, and are saved in the
24 warehouse at each checkpoint and when the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 If a job step outputs anything to stderr, it appears in
29 whjobmanager(1p)'s log when the step finishes.
30
31 While the job is running, the following signals are accepted:
32
33 =over
34
35 =item control-C, SIGINT, SIGQUIT
36
37 Save a checkpoint, terminate any job steps that are running, and stop.
38
39 =item SIGALRM
40
41 Save a checkpoint and continue.
42
43 =back
44
45 =head1 SEE ALSO
46
47 whintro(1p), wh(1p)
48
49 =cut
50
51
52 use strict;
53 use DBI;
54 use POSIX ':sys_wait_h';
55 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
56 use Warehouse;
57 use Warehouse::Stream;
58
59 $ENV{"TMPDIR"} ||= "/tmp";
60
61 do '/etc/warehouse/warehouse-server.conf';
62
63 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
64 my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/;
65
66
67 $SIG{'USR1'} = sub
68 {
69   $main::ENV{MR_DEBUG} = 1;
70 };
71 $SIG{'USR2'} = sub
72 {
73   $main::ENV{MR_DEBUG} = 0;
74 };
75
76
77
78 my $whc = new Warehouse or croak ("failed to create Warehouse client");
79 my $metastream = new Warehouse::Stream (whc => $whc);
80 $metastream->clear;
81 $metastream->name (".");
82 $metastream->write_start ("log.txt");
83
84
85
86 my $Job = {};
87 my $job_id;
88 my $dbh;
89 my $sth;
90 if ($have_database)
91 {
92   ($job_id) = @ARGV;
93
94   $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
95   croak ($DBI::errstr) if !$dbh;
96   $dbh->{InactiveDestroy} = 1;
97
98   $sth = $dbh->prepare ("select * from mrjob where id=?");
99   $sth->execute ($job_id) or croak ($dbh->errstr);
100   $Job = $sth->fetchrow_hashref or croak ($sth->errstr);
101 }
102
103 else
104 {
105   my %knob;
106   foreach (@ARGV)
107   {
108     if (/([a-z].*?)=(.*)/) {
109       $Job->{$1} = $2;
110     } elsif (/(.*?)=(.*)/) {
111       $knob{$1} = $2;
112     }
113   }
114   $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob);
115
116   if (!$Job->{thawedfromkey})
117   {
118     map { croak ("No $_ specified") unless $Job->{$_} }
119     qw(mrfunction revision inputkey);
120   }
121
122   if (!defined $Job->{id}) {
123     chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`));
124   }
125   $job_id = $Job->{id};
126 }
127
128
129
130 $Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey};
131 delete $Job->{input0};
132
133
134
135 my $max_ncpus;
136 map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs});
137 $max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/;
138 $max_ncpus = $$Job{stepspernode} if $$Job{stepspernode};
139 my $maxstepspernode;
140
141
142
143 Log (undef, "check slurm allocation");
144 my @slot;
145 my @node;
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
147 my @sinfo;
148 if (!$have_slurm)
149 {
150   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151   push @sinfo, "$localcpus localhost";
152 }
153 if (exists $ENV{SLURM_NODELIST})
154 {
155   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
156 }
157 foreach (@sinfo)
158 {
159   my ($ncpus, $slurm_nodelist) = split;
160   $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0;
161   $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus;
162
163   my @nodelist;
164   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
165   {
166     my $nodelist = $1;
167     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
168     {
169       my $ranges = $1;
170       foreach (split (",", $ranges))
171       {
172         my ($a, $b);
173         if (/(\d+)-(\d+)/)
174         {
175           $a = $1;
176           $b = $2;
177         }
178         else
179         {
180           $a = $_;
181           $b = $_;
182         }
183         push @nodelist, map {
184           my $n = $nodelist;
185           $n =~ s/\[[-,\d]+\]/$_/;
186           $n;
187         } ($a..$b);
188       }
189     }
190     else
191     {
192       push @nodelist, $nodelist;
193     }
194   }
195   foreach my $nodename (@nodelist)
196   {
197     Log (undef, "node $nodename - $ncpus slots");
198     my $node = { name => $nodename,
199                  ncpus => $ncpus,
200                  losing_streak => 0,
201                  hold_until => 0 };
202     foreach my $cpu (1..$ncpus)
203     {
204       push @slot, { node => $node,
205                     cpu => $cpu };
206     }
207   }
208   push @node, @nodelist;
209 }
210
211
212
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
215
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
217
218
219
220 my $jobmanager_id;
221 if ($have_database)
222 {
223   # Claim this job, and make sure nobody else does
224
225   $sth = $dbh->prepare ("insert into mrjobmanager
226                          (pid, revision, starttime)
227                          values (?, ?, now())");
228   my $rev = q/$Revision$/;
229   $rev =~ /\d+/;
230   $sth->execute ($$, +$&) or croak ($dbh->errstr);
231
232   $sth = $dbh->prepare ("select last_insert_id()");
233   $sth->execute or croak ($dbh->errstr);
234   ($jobmanager_id) = $sth->fetchrow_array;
235
236   $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now()
237                          where id=? and jobmanager_id is null");
238   $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr);
239
240   $sth = $dbh->prepare ("select jobmanager_id from mrjob
241                          where id=?");
242   $sth->execute ($job_id) or croak ($dbh->errstr);
243   my ($check_jobmanager_id) = $sth->fetchrow_array;
244   if ($check_jobmanager_id != $jobmanager_id)
245   {
246     # race condition - another job manager proc stole the job
247     Log (undef,
248          "job taken by jobmanager id $check_jobmanager_id");
249     exit (1);
250   }
251 }
252
253
254 Log (undef, "start");
255 $SIG{'INT'} = sub { $main::please_freeze = 1; };
256 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
257 $SIG{'TERM'} = \&croak;
258 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
259 $SIG{'ALRM'} = sub { $main::please_info = 1; };
260 $SIG{'CONT'} = sub { $main::please_continue = 1; };
261 $main::please_freeze = 0;
262 $main::please_info = 0;
263 $main::please_continue = 0;
264 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
265
266 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
267 $ENV{"MR_JOB_ID"} = $job_id;
268 $ENV{"JOB_UUID"} = $job_id;
269
270
271 my @jobstep;
272 my @jobstep_todo = ();
273 my @jobstep_done = ();
274 my @jobstep_tomerge = ();
275 my $jobstep_tomerge_level = 0;
276 my $squeue_checked;
277 my $squeue_kill_checked;
278 my $output_in_keep = 0;
279
280
281
282 if (defined $Job->{thawedfromkey})
283 {
284   thaw ($Job->{thawedfromkey});
285 }
286 else
287 {
288   push @jobstep, { input => $Job->{inputkey},
289                    level => 0,
290                    attempts => 0,
291                  };
292   push @jobstep_todo, 0;
293 }
294
295
296
297 mkdir ($ENV{"TMPDIR"}."/mrcompute");
298 if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) {
299   # set up a fresh gnupg directory just for this process
300   # TODO: reap abandoned gnupg dirs
301   system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$");
302   mkdir ($ENV{"TMPDIR"}."/mrcompute");
303   mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700);
304   mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!");
305
306   my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$";
307
308   open C, ">", $newhomedir."/gpg.conf";
309   print C "always-trust\n";
310   close C;
311
312   # import secret keys referenced in job spec
313   my $hashes = $1;
314   $hashes =~ s/\'/\'\\\'\'/g;
315   my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`;
316   my %encrypt_to;
317   while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) {
318     my $keynum = $1;
319     while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) {
320       $encrypt_to{$1} = 1;
321     }
322   }
323   if (!%encrypt_to) {
324     croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out");
325   }
326
327   if ($have_database) {
328
329     # make sure the job request was signed by all of the secret keys
330     # contained in GPG_KEYS (otherwise, any VM can just copy the
331     # GPG_KEYS hash from an existing mr-job and submit new jobs that can
332     # read private data)
333
334     my %did_not_sign;
335     my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`;
336     while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) {
337       $did_not_sign{$1} = 1;
338     }
339     my $srfile = "$newhomedir/signedrequest";
340     open SREQ, ">", $srfile;
341     print SREQ $$Job{"signedrequest"};
342     close SREQ;
343     my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`;
344     unlink $srfile;
345     if ($gpg_v =~ /\nok\n$/s) {
346       while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) {
347         delete $did_not_sign{$1};
348       }
349     }
350     if (%did_not_sign) {
351       croak (join ("\n",
352                    "Some secret keys provided did not sign this job request:",
353                    keys %did_not_sign) . "\n");
354     }
355   }
356
357   my $hostname = `hostname`;
358   chomp ($hostname);
359
360   # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need
361   $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`;
362   $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`;
363
364   # import all secret keys from my real home dir
365   `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`;
366
367   # use the new gnupg dir from now on
368   $ENV{"GNUPGHOME"} = $newhomedir;
369
370   # if I have a secret key for root@{host} or {user}@{host} or
371   # {configured-controller-gpg-uid}, add that as a recipient too so
372   # I'll be able to read frozentokeys etc. later
373   my %allkeys;
374   while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) {
375     $allkeys{$1} = 1;
376   }
377   my $encrypting_to_self = 0;
378   my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname);
379   push @try_these_uids, ($whc->{config}->{controller_gpg_uid})
380       if exists $whc->{config}->{controller_gpg_uid};
381   foreach my $id (@try_these_uids) {
382     if (exists $allkeys{$id}) {
383       $encrypt_to{$id} = 1;
384       $encrypting_to_self = 1;
385       last;
386     }
387   }
388
389   if (!$encrypting_to_self) {
390     croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read");
391   }
392
393   # tell the client library (and child procs and jobsteps) to encrypt using these keys
394   $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to);
395   Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')");
396   $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"});
397 }
398
399
400
401 $ENV{"MR_REVISION"} = $Job->{revision};
402
403 my $git_build_script;
404 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
405 if ($skip_install)
406 {
407   $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision};
408 }
409 else
410 {
411   Log (undef, "Install revision ".$Job->{revision});
412   my $nodelist = join(",", @node);
413
414   # Clean out mrcompute/work and mrcompute/opt
415
416   my $cleanpid = fork();
417   if ($cleanpid == 0)
418   {
419     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}],
420           ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']);
421     exit (1);
422   }
423   while (1)
424   {
425     last if $cleanpid == waitpid (-1, WNOHANG);
426     freeze_if_want_freeze ($cleanpid);
427     select (undef, undef, undef, 0.1);
428   }
429   Log (undef, "Clean-work-dir exited $?");
430
431   # Install requested code version
432
433   my $build_script;
434   my @execargs;
435   my @srunargs = ("srun",
436                   "--nodelist=$nodelist",
437                   "-D", $ENV{TMPDIR}, "--job-name=$job_id");
438
439   $ENV{"MR_REVISION"} = $Job->{revision};
440   $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps";
441   $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt";
442
443   my $commit;
444   my $treeish = $Job->{revision};
445   my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url");
446
447   # Create/update our clone of the remote git repo
448
449   if (!-d $ENV{MR_REVISION_SRCDIR}) {
450     system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0
451         or croak ("git clone $repo failed: exit ".($?>>8));
452     system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false");
453   }
454   `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`;
455
456   # If this looks like a subversion r#, look for it in git-svn commit messages
457
458   if ($treeish =~ m{^\d{1,4}$}) {
459     my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
460     chomp $gitlog;
461     if ($gitlog =~ /^[a-f0-9]{40}$/) {
462       $commit = $gitlog;
463       Log (undef, "Using commit $commit for revision $treeish");
464     }
465   }
466
467   # If that didn't work, try asking git to look it up as a tree-ish.
468
469   if (!defined $commit) {
470
471     my $cooked_treeish = $treeish;
472     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
473       # Looks like a git branch name -- make sure git knows it's
474       # relative to the remote repo
475       $cooked_treeish = "origin/$treeish";
476     }
477
478     my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`;
479     chomp $found;
480     if ($found =~ /^[0-9a-f]{40}$/s) {
481       $commit = $found;
482       if ($commit ne $treeish) {
483         # Make sure we record the real commit id in the database,
484         # frozentokey, logs, etc. -- instead of an abbreviation or a
485         # branch name which can become ambiguous or point to a
486         # different commit in the future.
487         $ENV{"MR_REVISION"} = $commit;
488         $Job->{revision} = $commit;
489         dbh_do
490             ("update mrjob set revision=? where id=?",
491              undef,
492              $Job->{revision}, $Job->{id});
493         Log (undef, "Using commit $commit for tree-ish $treeish");
494       }
495     }
496   }
497
498   if (defined $commit) {
499     $ENV{"MR_GIT_COMMIT"} = $commit;
500     $ENV{"MR_GIT_CLONE_URL"} = $repo;
501     @execargs = ("sh", "-c",
502                  "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo");
503     open GBS, "<", `echo -n \$(which whjob-checkout-and-build)`
504         or croak ("can't find whjob-checkout-and-build");
505     local $/ = undef;
506     $git_build_script = <GBS>;
507     close GBS;
508     $build_script = $git_build_script;
509   }
510   elsif ($treeish =~ m{^(\d{1,5})$}) {
511     # Want a subversion r# but couldn't find it in git-svn history -
512     # might as well try using the subversion repo in case it's still
513     # there.
514     $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root");
515     $ENV{"INSTALL_REVISION"} = $Job->{revision};
516     $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish";
517     $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src";
518     @execargs = ("sh", "-c",
519                  "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )");
520   }
521   else {
522     croak ("could not figure out commit id for $treeish");
523   }
524
525   my $installpid = fork();
526   if ($installpid == 0)
527   {
528     srun (\@srunargs, \@execargs, {}, $build_script);
529     exit (1);
530   }
531   while (1)
532   {
533     last if $installpid == waitpid (-1, WNOHANG);
534     freeze_if_want_freeze ($installpid);
535     select (undef, undef, undef, 0.1);
536   }
537   Log (undef, "Install exited $?");
538 }
539
540
541
542 foreach (qw (mrfunction revision nodes stepspernode inputkey))
543 {
544   Log (undef, $_ . " " . $Job->{$_});
545 }
546 foreach (split (/\n/, $Job->{knobs}))
547 {
548   Log (undef, "knob " . $_);
549 }
550
551
552
553 my $success;
554
555
556
557 ONELEVEL:
558
559 my $thisround_succeeded = 0;
560 my $thisround_failed = 0;
561 my $thisround_failed_multiple = 0;
562
563 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
564                        or $a <=> $b } @jobstep_todo;
565 my $level = $jobstep[$jobstep_todo[0]]->{level};
566 Log (undef, "start level $level");
567
568
569
570 my %proc;
571 my @freeslot = (0..$#slot);
572 my @holdslot;
573 my %reader;
574 my ($id, $input, $attempts);
575 my $progress_is_dirty = 1;
576 my $progress_stats_updated = 0;
577
578 update_progress_stats();
579
580
581
582 THISROUND:
583 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
584 {
585   $main::please_continue = 0;
586
587   my $id = $jobstep_todo[$todo_ptr];
588   my $Jobstep = $jobstep[$id];
589   if ($Jobstep->{level} != $level)
590   {
591     next;
592   }
593   if ($Jobstep->{attempts} > 9)
594   {
595     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
596     $success = 0;
597     last THISROUND;
598   }
599
600   pipe $reader{$id}, "writer" or croak ($!);
601   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
602   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
603
604   my $childslot = $freeslot[0];
605   my $childnode = $slot[$childslot]->{node};
606   my $childslotname = join (".",
607                             $slot[$childslot]->{node}->{name},
608                             $slot[$childslot]->{cpu});
609   my $childpid = fork();
610   if ($childpid == 0)
611   {
612     $SIG{'INT'} = 'DEFAULT';
613     $SIG{'QUIT'} = 'DEFAULT';
614     $SIG{'TERM'} = 'DEFAULT';
615
616     foreach (values (%reader))
617     {
618       close($_);
619     }
620     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
621     open(STDOUT,">&writer");
622     open(STDERR,">&writer");
623
624     undef $dbh;
625     undef $sth;
626
627
628     delete $ENV{"GNUPGHOME"};
629     $ENV{"MR_ID"} = $id;
630     $ENV{"MR_INPUT"} = $Jobstep->{input};
631     $ENV{"MR_KNOBS"} = $Job->{knobs};
632     $ENV{"MR_LEVEL"} = $level;
633     $ENV{"MR_FUNCTION"} = $Job->{mrfunction};
634     $ENV{"MR_INPUT0"} = $Job->{inputkey};
635     $ENV{"MR_INPUTKEY"} = $Job->{inputkey};
636     $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
637     $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
638     $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated
639     $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work";
640     $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu};
641     $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
642     $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers);
643     $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain;
644     $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class;
645
646     $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id;
647     $ENV{"TASK_QSEQUENCE"} = $id;
648     $ENV{"TASK_SEQUENCE"} = $Jobstep->{level};
649
650     $ENV{"GZIP"} = "-n";
651
652     my @srunargs = (
653       "srun",
654       "--nodelist=".$childnode->{name},
655       qw(-n1 -c1 -N1 -D), $ENV{TMPDIR},
656       "--job-name=$job_id.$id.$$",
657         );
658     my @execargs = qw(sh);
659     my $script = "";
660     my $command =
661         "mkdir -p $ENV{TMPDIR}/mrcompute/revision "
662         ."&& cd $ENV{TMPDIR}/mrcompute ";
663     if ($git_build_script)
664     {
665       $script = $git_build_script;
666       $command .=
667           "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}";
668     }
669     elsif (!$skip_install)
670     {
671       $command .=
672           "&& "
673           ."( "
674           ."  [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] "
675           ."|| "
676           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
677           ."    && ./installrevision "
678           ."  ) "
679           .") ";
680     }
681     if (exists $ENV{GPG_KEYS}) {
682       $command .=
683           "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
684     } else {
685       $command .=
686           "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
687     }
688     my @execargs = ('bash', '-c', $command);
689     srun (\@srunargs, \@execargs, undef, $script);
690     exit (1);
691   }
692   close("writer");
693   if (!defined $childpid)
694   {
695     close $reader{$id};
696     delete $reader{$id};
697     next;
698   }
699   shift @freeslot;
700   $proc{$childpid} = { jobstep => $id,
701                        time => time,
702                        slot => $childslot,
703                        jobstepname => "$job_id.$id.$childpid",
704                      };
705   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
706   $slot[$childslot]->{pid} = $childpid;
707
708   Log ($id, "child $childpid started on $childslotname");
709   $Jobstep->{attempts} ++;
710   $Jobstep->{starttime} = time;
711   $Jobstep->{node} = $childnode->{name};
712   $Jobstep->{slotindex} = $childslot;
713   delete $Jobstep->{stderr};
714   delete $Jobstep->{output};
715   delete $Jobstep->{finishtime};
716
717   splice @jobstep_todo, $todo_ptr, 1;
718   --$todo_ptr;
719
720   $progress_is_dirty = 1;
721
722   while (!@freeslot
723          ||
724          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
725   {
726     last THISROUND if $main::please_freeze;
727     if ($main::please_info)
728     {
729       $main::please_info = 0;
730       freeze();
731       collate_output();
732       save_meta(1);
733       update_progress_stats();
734     }
735     my $gotsome
736         = readfrompipes ()
737         + reapchildren ();
738     if (!$gotsome)
739     {
740       check_squeue();
741       update_progress_stats();
742       select (undef, undef, undef, 0.1);
743     }
744     elsif (time - $progress_stats_updated >= 30)
745     {
746       update_progress_stats();
747     }
748     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
749         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
750     {
751       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
752           .($thisround_failed+$thisround_succeeded)
753           .") -- giving up on this round";
754       Log (undef, $message);
755       last THISROUND;
756     }
757
758     # move slots from freeslot to holdslot (or back to freeslot) if necessary
759     for (my $i=$#freeslot; $i>=0; $i--) {
760       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
761         push @holdslot, (splice @freeslot, $i, 1);
762       }
763     }
764     for (my $i=$#holdslot; $i>=0; $i--) {
765       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
766         push @freeslot, (splice @holdslot, $i, 1);
767       }
768     }
769
770     # give up if no nodes are succeeding
771     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
772       my $message = "Every node has failed -- giving up on this round";
773       Log (undef, $message);
774       last THISROUND;
775     }
776   }
777 }
778
779
780 push @freeslot, splice @holdslot;
781 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
782
783
784 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
785 while (%proc)
786 {
787   goto THISROUND if $main::please_continue;
788   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
789   readfrompipes ();
790   if (!reapchildren())
791   {
792     check_squeue();
793     update_progress_stats();
794     select (undef, undef, undef, 0.1);
795     killem (keys %proc) if $main::please_freeze;
796   }
797 }
798
799 update_progress_stats();
800 freeze_if_want_freeze();
801
802
803 if (@jobstep_tomerge && !@jobstep_todo)
804 {
805   push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0),
806                    level => $jobstep_tomerge_level,
807                    attempts => 0 };
808   push @jobstep_todo, $#jobstep;
809 }
810
811
812 if (!defined $success)
813 {
814   if (@jobstep_todo &&
815       $thisround_succeeded == 0 &&
816       ($thisround_failed == 0 || $thisround_failed > 4))
817   {
818     my $message = "stop because $thisround_failed tasks failed and none succeeded";
819     Log (undef, $message);
820     $success = 0;
821   }
822   if (!@jobstep_todo)
823   {
824     $success = 1;
825   }
826 }
827
828 goto ONELEVEL if !defined $success;
829
830
831 release_allocation();
832 freeze();
833 my $key = &collate_output();
834 $success = 0 if !$key;
835
836
837 if ($key)
838 {
839   my @keepkey;
840   foreach my $hash (split ",", $key)
841   {
842     my $keephash = $whc->store_in_keep (hash => $hash,
843                                         nnodes => 3);
844     if (!$keephash)
845     {
846       Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr);
847       $keephash = $hash;
848     }
849     push @keepkey, $keephash;
850   }
851   my $keepkey = join (",", @keepkey);
852   Log (undef, "outputkey+K $keepkey");
853   print "$keepkey\n" if $success;
854
855   if ($output_in_keep)
856   {
857     $key = $keepkey;
858   }
859
860   dbh_do ("update mrjob set output=? where id=?", undef,
861           $key, $job_id)
862       or croak ($dbh->errstr);
863
864   $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id")
865       or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr);
866 }
867
868
869 Log (undef, "finish");
870
871 dbh_do ("update mrjob set finishtime=now(), success=?
872          where id=? and jobmanager_id=?", undef,
873         $success, $job_id, $jobmanager_id)
874     or croak ($dbh->errstr);
875
876 save_meta();
877 exit 0;
878
879
880
881 sub update_progress_stats
882 {
883   $progress_stats_updated = time;
884   return if !$progress_is_dirty;
885   my ($todo, $done, $running) = (scalar @jobstep_todo,
886                                  scalar @jobstep_done,
887                                  scalar @slot - scalar @freeslot - scalar @holdslot);
888   dbh_do
889       ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?",
890        undef,
891        $todo, $done, $running, $job_id);
892   Log (undef, "status: $done done, $running running, $todo todo");
893   $progress_is_dirty = 0;
894 }
895
896
897
898 sub reapchildren
899 {
900   my $pid = waitpid (-1, WNOHANG);
901   return 0 if $pid <= 0;
902
903   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
904                   . "."
905                   . $slot[$proc{$pid}->{slot}]->{cpu});
906   my $jobstepid = $proc{$pid}->{jobstep};
907   my $elapsed = time - $proc{$pid}->{time};
908   my $Jobstep = $jobstep[$jobstepid];
909
910   process_stderr_for_output_key ($jobstepid);
911
912   my $exitcode = $?;
913   my $exitinfo = "exit $exitcode";
914   if (!exists $Jobstep->{output})
915   {
916     $exitinfo .= " with no output key";
917     $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys;
918   }
919
920   if ($exitcode == 0 && $Jobstep->{node_fail}) {
921     $exitinfo .= " but recording as failure";
922     $exitcode = -1;
923   }
924
925   Log ($jobstepid, "child $pid on $whatslot $exitinfo");
926
927   if ($exitcode != 0 || $Jobstep->{node_fail})
928   {
929     --$Jobstep->{attempts} if $Jobstep->{node_fail};
930     ++$thisround_failed;
931     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
932
933     # Check for signs of a failed or misconfigured node
934     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
935         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
936       # Don't count this against jobstep failure thresholds if this
937       # node is already suspected faulty and srun exited quickly
938       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
939           $elapsed < 5 &&
940           $Jobstep->{attempts} > 1) {
941         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
942         --$Jobstep->{attempts};
943       }
944       ban_node_by_slot($proc{$pid}->{slot});
945     }
946
947     push @jobstep_todo, $jobstepid;
948     Log ($jobstepid, "failure in $elapsed seconds");
949   }
950   else
951   {
952     ++$thisround_succeeded;
953     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
954     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
955     push @jobstep_done, $jobstepid;
956     Log ($jobstepid, "success in $elapsed seconds");
957   }
958   $Jobstep->{exitcode} = $exitcode;
959   $Jobstep->{finishtime} = time;
960   process_stderr ($jobstepid, $exitcode == 0);
961   Log ($jobstepid, "output $$Jobstep{output}");
962
963   close $reader{$jobstepid};
964   delete $reader{$jobstepid};
965   delete $slot[$proc{$pid}->{slot}]->{pid};
966   push @freeslot, $proc{$pid}->{slot};
967   delete $proc{$pid};
968
969   $progress_is_dirty = 1;
970   1;
971 }
972
973
974 sub check_squeue
975 {
976   # return if the kill list was checked <4 seconds ago
977   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
978   {
979     return;
980   }
981   $squeue_kill_checked = time;
982
983   # use killem() on procs whose killtime is reached
984   for (keys %proc)
985   {
986     if (exists $proc{$_}->{killtime}
987         && $proc{$_}->{killtime} <= time)
988     {
989       killem ($_);
990     }
991   }
992
993   # return if the squeue was checked <60 seconds ago
994   if (defined $squeue_checked && $squeue_checked > time - 60)
995   {
996     return;
997   }
998   $squeue_checked = time;
999
1000   if (!$have_slurm)
1001   {
1002     # here is an opportunity to check for mysterious problems with local procs
1003     return;
1004   }
1005
1006   # get a list of steps still running
1007   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1008   chop @squeue;
1009   if ($squeue[-1] ne "ok")
1010   {
1011     return;
1012   }
1013   pop @squeue;
1014
1015   # which of my jobsteps are running, according to squeue?
1016   my %ok;
1017   foreach (@squeue)
1018   {
1019     if (/^(\d+)\.(\d+) (\S+)/)
1020     {
1021       if ($1 eq $ENV{SLURM_JOBID})
1022       {
1023         $ok{$3} = 1;
1024       }
1025     }
1026   }
1027
1028   # which of my active child procs (>60s old) were not mentioned by squeue?
1029   foreach (keys %proc)
1030   {
1031     if ($proc{$_}->{time} < time - 60
1032         && !exists $ok{$proc{$_}->{jobstepname}}
1033         && !exists $proc{$_}->{killtime})
1034     {
1035       # kill this proc if it hasn't exited in 30 seconds
1036       $proc{$_}->{killtime} = time + 30;
1037     }
1038   }
1039 }
1040
1041
1042 sub release_allocation
1043 {
1044   if ($have_slurm)
1045   {
1046     Log (undef, "release job allocation");
1047     system "scancel $ENV{SLURM_JOBID}";
1048   }
1049 }
1050
1051
1052 sub readfrompipes
1053 {
1054   my $gotsome = 0;
1055   foreach my $job (keys %reader)
1056   {
1057     my $buf;
1058     while (0 < sysread ($reader{$job}, $buf, 8192))
1059     {
1060       print STDERR $buf if $ENV{MR_DEBUG};
1061       $jobstep[$job]->{stderr} .= $buf;
1062       preprocess_stderr ($job);
1063       if (length ($jobstep[$job]->{stderr}) > 16384 &&
1064           $jobstep[$job]->{stderr} !~ /\+\+\+mr/)
1065       {
1066         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1067       }
1068       $gotsome = 1;
1069     }
1070   }
1071   return $gotsome;
1072 }
1073
1074
1075 sub process_stderr_for_output_key
1076 {
1077   my $job = shift;
1078   while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s)
1079   {
1080     $jobstep[$job]->{output} = $1;
1081     $jobsteps_must_output_keys = 1;
1082   }
1083 }
1084
1085
1086 sub preprocess_stderr
1087 {
1088   my $job = shift;
1089
1090   $jobstep[$job]->{stderr_jobsteps} = []
1091       if !exists $jobstep[$job]->{stderr_jobsteps};
1092
1093   $jobstep[$job]->{stderr} =~
1094       s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{
1095         push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1);
1096         "";
1097       }gse;
1098
1099   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1100     my $line = $1;
1101     if ($line =~ /\+\+\+mr/) {
1102       last;
1103     }
1104     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1105     Log ($job, "stderr $line");
1106     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
1107       # whoa.
1108       $main::please_freeze = 1;
1109     }
1110     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1111       $jobstep[$job]->{node_fail} = 1;
1112       ban_node_by_slot($jobstep[$job]->{slotindex});
1113     }
1114   }
1115 }
1116
1117
1118 sub process_stderr
1119 {
1120   my $job = shift;
1121   my $success = shift;
1122   preprocess_stderr ($job);
1123
1124   map {
1125     Log ($job, "stderr $_");
1126   } split ("\n", $jobstep[$job]->{stderr});
1127
1128   if (!$success || !exists $jobstep[$job]->{stderr_jobsteps})
1129   {
1130     delete $jobstep[$job]->{stderr_jobsteps};
1131     return;
1132   }
1133
1134   foreach (@{ $jobstep[$job]->{stderr_jobsteps} })
1135   {
1136     /^(?:\/(\d+|\*))? (\d+) (.*)/s;
1137     my ($merge, $level, $input) = ($1, $2, $3);
1138     my $newjobref;
1139     if ($merge)
1140     {
1141       push @jobstep_tomerge, $input;
1142       $jobstep_tomerge_level = $level;
1143       if ($merge !~ /\D/ && @jobstep_tomerge >= $merge)
1144       {
1145         $newjobref = { input => join ("\n",
1146                                       splice @jobstep_tomerge, 0, $merge),
1147                        level => $level,
1148                        attempts => 0 };
1149       }
1150     }
1151     else
1152     {
1153       $newjobref = { input => $input,
1154                      level => $level,
1155                      attempts => 0 };
1156     }
1157     if ($newjobref)
1158     {
1159       push @jobstep, $newjobref;
1160       push @jobstep_todo, $#jobstep;
1161     }
1162   }
1163   delete $jobstep[$job]->{stderr_jobsteps};
1164 }
1165
1166
1167 sub collate_output
1168 {
1169   Log (undef, "collate");
1170   $whc->write_start (1);
1171   my $key;
1172   for (@jobstep)
1173   {
1174     next if !exists $_->{output} || $_->{exitcode} != 0;
1175     my $output = $_->{output};
1176     if ($output !~ /^[0-9a-f]{32}/)
1177     {
1178       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1179       $whc->write_data ($output);
1180     }
1181     elsif (@jobstep == 1)
1182     {
1183       $key = $output;
1184       $whc->write_finish;
1185     }
1186     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1187     {
1188       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1189       $whc->write_data ($outblock);
1190     }
1191     else
1192     {
1193       my $errstr = $whc->errstr;
1194       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1195       $success = 0;
1196     }
1197   }
1198   $key = $whc->write_finish if !defined $key;
1199   if ($key)
1200   {
1201     Log (undef, "outputkey $key");
1202     dbh_do ("update mrjob set output=? where id=?", undef,
1203             $key, $job_id)
1204         or Log (undef, "db update failed: ".$DBI::errstr);
1205   }
1206   else
1207   {
1208     Log (undef, "outputkey undef");
1209   }
1210   return $key;
1211 }
1212
1213
1214 sub killem
1215 {
1216   foreach (@_)
1217   {
1218     my $sig = 2;                # SIGINT first
1219     if (exists $proc{$_}->{"sent_$sig"} &&
1220         time - $proc{$_}->{"sent_$sig"} > 4)
1221     {
1222       $sig = 15;                # SIGTERM if SIGINT doesn't work
1223     }
1224     if (exists $proc{$_}->{"sent_$sig"} &&
1225         time - $proc{$_}->{"sent_$sig"} > 4)
1226     {
1227       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1228     }
1229     if (!exists $proc{$_}->{"sent_$sig"})
1230     {
1231       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1232       kill $sig, $_;
1233       select (undef, undef, undef, 0.1);
1234       if ($sig == 2)
1235       {
1236         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1237       }
1238       $proc{$_}->{"sent_$sig"} = time;
1239       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1240     }
1241   }
1242 }
1243
1244
1245 sub fhbits
1246 {
1247   my($bits);
1248   for (@_) {
1249     vec($bits,fileno($_),1) = 1;
1250   }
1251   $bits;
1252 }
1253
1254
1255 sub Log                         # ($jobstep_id, $logmessage)
1256 {
1257   if ($_[1] =~ /\n/) {
1258     for my $line (split (/\n/, $_[1])) {
1259       Log ($_[0], $line);
1260     }
1261     return;
1262   }
1263   my $fh = select STDERR; $|=1; select $fh;
1264   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1265   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1266   $message .= "\n";
1267   my $datetime;
1268   if ($metastream || -t STDERR) {
1269     my @gmtime = gmtime;
1270     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1271                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1272   }
1273   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1274
1275   return if !$metastream;
1276   $metastream->write_data ($datetime . " " . $message);
1277 }
1278
1279
1280 sub reconnect_database
1281 {
1282   return if !$have_database;
1283   return if ($dbh && $dbh->do ("select now()"));
1284   for (1..16)
1285   {
1286     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1287     if ($dbh) {
1288       $dbh->{InactiveDestroy} = 1;
1289       return;
1290     }
1291     warn ($DBI::errstr);
1292     sleep $_;
1293   }
1294   croak ($DBI::errstr) if !$dbh;
1295 }
1296
1297
1298 sub dbh_do
1299 {
1300   return 1 if !$have_database;
1301   my $ret = $dbh->do (@_);
1302   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1303   reconnect_database();
1304   return $dbh->do (@_);
1305 }
1306
1307
1308 sub croak
1309 {
1310   my ($package, $file, $line) = caller;
1311   my $message = "@_ at $file line $line\n";
1312   Log (undef, $message);
1313   freeze() if @jobstep_todo;
1314   collate_output() if @jobstep_todo;
1315   cleanup();
1316   save_meta() if $metastream;
1317   die;
1318 }
1319
1320
1321 sub cleanup
1322 {
1323   return if !$have_database || !$dbh;
1324
1325   reconnect_database();
1326   my $sth;
1327   $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
1328   $sth->execute ($jobmanager_id);
1329   $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
1330   $sth->execute ($job_id, $jobmanager_id);
1331 }
1332
1333
1334 sub save_meta
1335 {
1336   reconnect_database();
1337   my $justcheckpoint = shift; # false if this will be the last meta saved
1338   my $m = $metastream;
1339   $m = $m->copy if $justcheckpoint;
1340   $m->write_finish;
1341   my $key = $m->as_key;
1342   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1343   Log (undef, "meta key is $key");
1344   dbh_do ("update mrjob set metakey=? where id=?",
1345           undef,
1346           $key, $job_id);
1347 }
1348
1349
1350 sub freeze_if_want_freeze
1351 {
1352   if ($main::please_freeze)
1353   {
1354     release_allocation();
1355     if (@_)
1356     {
1357       # kill some srun procs before freeze+stop
1358       map { $proc{$_} = {} } @_;
1359       while (%proc)
1360       {
1361         killem (keys %proc);
1362         select (undef, undef, undef, 0.1);
1363         my $died;
1364         while (($died = waitpid (-1, WNOHANG)) > 0)
1365         {
1366           delete $proc{$died};
1367         }
1368       }
1369     }
1370     freeze();
1371     collate_output();
1372     cleanup();
1373     save_meta();
1374     exit 0;
1375   }
1376 }
1377
1378
1379 sub freeze
1380 {
1381   Log (undef, "freeze");
1382
1383   my $freezer = new Warehouse::Stream (whc => $whc);
1384   $freezer->clear;
1385   $freezer->name (".");
1386   $freezer->write_start ("state.txt");
1387
1388   $freezer->write_data (join ("\n",
1389                               "job $Job->{id}",
1390                               map
1391                               {
1392                                 $_ . "=" . freezequote($Job->{$_})
1393                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1394
1395   foreach my $Jobstep (@jobstep)
1396   {
1397     my $str = join ("\n",
1398                     map
1399                     {
1400                       $_ . "=" . freezequote ($Jobstep->{$_})
1401                     } grep {
1402                       $_ !~ /^stderr|slotindex|node_fail/
1403                     } keys %$Jobstep);
1404     $freezer->write_data ($str."\n\n");
1405   }
1406   if (@jobstep_tomerge)
1407   {
1408     $freezer->write_data
1409         ("merge $jobstep_tomerge_level "
1410          . freezequote (join ("\n",
1411                               map { freezequote ($_) } @jobstep_tomerge))
1412          . "\n\n");
1413   }
1414
1415   $freezer->write_finish;
1416   my $frozentokey = $freezer->as_key;
1417   undef $freezer;
1418   Log (undef, "frozento key is $frozentokey");
1419   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1420           $frozentokey, $job_id);
1421   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1422   Log (undef, "frozento+K key is $kfrozentokey");
1423   return $frozentokey;
1424 }
1425
1426
1427 sub thaw
1428 {
1429   my $key = shift;
1430   Log (undef, "thaw from $key");
1431
1432   @jobstep = ();
1433   @jobstep_done = ();
1434   @jobstep_todo = ();
1435   @jobstep_tomerge = ();
1436   $jobstep_tomerge_level = 0;
1437   my $frozenjob = {};
1438
1439   my $stream = new Warehouse::Stream ( whc => $whc,
1440                                        hash => [split (",", $key)] );
1441   $stream->rewind;
1442   while (my $dataref = $stream->read_until (undef, "\n\n"))
1443   {
1444     if ($$dataref =~ /^job /)
1445     {
1446       foreach (split ("\n", $$dataref))
1447       {
1448         my ($k, $v) = split ("=", $_, 2);
1449         $frozenjob->{$k} = freezeunquote ($v);
1450       }
1451       next;
1452     }
1453
1454     if ($$dataref =~ /^merge (\d+) (.*)/)
1455     {
1456       $jobstep_tomerge_level = $1;
1457       @jobstep_tomerge
1458           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1459       next;
1460     }
1461
1462     my $Jobstep = { };
1463     foreach (split ("\n", $$dataref))
1464     {
1465       my ($k, $v) = split ("=", $_, 2);
1466       $Jobstep->{$k} = freezeunquote ($v) if $k;
1467     }
1468     $Jobstep->{attempts} = 0;
1469     push @jobstep, $Jobstep;
1470
1471     if ($Jobstep->{exitcode} eq "0")
1472     {
1473       push @jobstep_done, $#jobstep;
1474     }
1475     else
1476     {
1477       push @jobstep_todo, $#jobstep;
1478     }
1479   }
1480
1481   foreach (qw (mrfunction revision inputkey knobs))
1482   {
1483     $Job->{$_} = $frozenjob->{$_};
1484   }
1485   dbh_do
1486       ("update mrjob
1487         set mrfunction=?, revision=?, input0=?, knobs=?
1488         where id=?", undef,
1489        $Job->{mrfunction},
1490        $Job->{revision},
1491        $Job->{inputkey},
1492        $Job->{knobs},
1493        $Job->{id},
1494       );
1495 }
1496
1497
1498 sub freezequote
1499 {
1500   my $s = shift;
1501   $s =~ s/\\/\\\\/g;
1502   $s =~ s/\n/\\n/g;
1503   return $s;
1504 }
1505
1506
1507 sub freezeunquote
1508 {
1509   my $s = shift;
1510   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1511   return $s;
1512 }
1513
1514
1515 sub srun
1516 {
1517   my $srunargs = shift;
1518   my $execargs = shift;
1519   my $opts = shift || {};
1520   my $stdin = shift;
1521   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1522   print STDERR (join (" ",
1523                       map { / / ? "'$_'" : $_ }
1524                       (@$args)),
1525                 "\n")
1526       if $ENV{MR_DEBUG};
1527
1528   if (defined $stdin) {
1529     my $child = open STDIN, "-|";
1530     defined $child or die "no fork: $!";
1531     if ($child == 0) {
1532       print $stdin or die $!;
1533       close STDOUT or die $!;
1534       exit 0;
1535     }
1536   }
1537
1538   return system (@$args) if $opts->{fork};
1539
1540   exec @$args;
1541   warn "ENV size is ".length(join(" ",%ENV));
1542   die "exec failed: $!: @$args";
1543 }
1544
1545
1546 sub ban_node_by_slot {
1547   # Don't start any new jobsteps on this node for 60 seconds
1548   my $slotid = shift;
1549   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1550   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1551 }