2 # -*- mode: perl; perl-indent-level: 2; -*-
6 whjobmanager: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from database, run tasks on compute nodes
11 (typically invoked by scheduler on cloud controller):
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \
19 [stepspernode=N] [SOMEKNOB=value] ...
21 =head1 RUNNING JOBS LOCALLY
23 whjobmanager(1p)'s log messages appear on stderr, and are saved in the
24 warehouse at each checkpoint and when the job finishes.
26 If the job succeeds, the job's output locator is printed on stdout.
28 If a job step outputs anything to stderr, it appears in
29 whjobmanager(1p)'s log when the step finishes.
31 While the job is running, the following signals are accepted:
35 =item control-C, SIGINT, SIGQUIT
37 Save a checkpoint, terminate any job steps that are running, and stop.
41 Save a checkpoint and continue.
54 use POSIX ':sys_wait_h';
55 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
57 use Warehouse::Stream;
59 $ENV{"TMPDIR"} ||= "/tmp";
61 do '/etc/warehouse/warehouse-server.conf';
63 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
64 my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/;
69 $main::ENV{MR_DEBUG} = 1;
73 $main::ENV{MR_DEBUG} = 0;
78 my $whc = new Warehouse or croak ("failed to create Warehouse client");
79 my $metastream = new Warehouse::Stream (whc => $whc);
81 $metastream->name (".");
82 $metastream->write_start ("log.txt");
94 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
95 croak ($DBI::errstr) if !$dbh;
96 $dbh->{InactiveDestroy} = 1;
98 $sth = $dbh->prepare ("select * from mrjob where id=?");
99 $sth->execute ($job_id) or croak ($dbh->errstr);
100 $Job = $sth->fetchrow_hashref or croak ($sth->errstr);
108 if (/([a-z].*?)=(.*)/) {
110 } elsif (/(.*?)=(.*)/) {
114 $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob);
116 if (!$Job->{thawedfromkey})
118 map { croak ("No $_ specified") unless $Job->{$_} }
119 qw(mrfunction revision inputkey);
122 if (!defined $Job->{id}) {
123 chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`));
125 $job_id = $Job->{id};
130 $Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey};
131 delete $Job->{input0};
136 map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs});
137 $max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/;
138 $max_ncpus = $$Job{stepspernode} if $$Job{stepspernode};
143 Log (undef, "check slurm allocation");
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
150 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151 push @sinfo, "$localcpus localhost";
153 if (exists $ENV{SLURM_NODELIST})
155 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
159 my ($ncpus, $slurm_nodelist) = split;
160 $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0;
161 $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus;
164 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
167 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
170 foreach (split (",", $ranges))
183 push @nodelist, map {
185 $n =~ s/\[[-,\d]+\]/$_/;
192 push @nodelist, $nodelist;
195 foreach my $nodename (@nodelist)
197 Log (undef, "node $nodename - $ncpus slots");
198 my $node = { name => $nodename,
202 foreach my $cpu (1..$ncpus)
204 push @slot, { node => $node,
208 push @node, @nodelist;
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
223 # Claim this job, and make sure nobody else does
225 $sth = $dbh->prepare ("insert into mrjobmanager
226 (pid, revision, starttime)
227 values (?, ?, now())");
228 my $rev = q/$Revision$/;
230 $sth->execute ($$, +$&) or croak ($dbh->errstr);
232 $sth = $dbh->prepare ("select last_insert_id()");
233 $sth->execute or croak ($dbh->errstr);
234 ($jobmanager_id) = $sth->fetchrow_array;
236 $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now()
237 where id=? and jobmanager_id is null");
238 $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr);
240 $sth = $dbh->prepare ("select jobmanager_id from mrjob
242 $sth->execute ($job_id) or croak ($dbh->errstr);
243 my ($check_jobmanager_id) = $sth->fetchrow_array;
244 if ($check_jobmanager_id != $jobmanager_id)
246 # race condition - another job manager proc stole the job
248 "job taken by jobmanager id $check_jobmanager_id");
254 Log (undef, "start");
255 $SIG{'INT'} = sub { $main::please_freeze = 1; };
256 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
257 $SIG{'TERM'} = \&croak;
258 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
259 $SIG{'ALRM'} = sub { $main::please_info = 1; };
260 $SIG{'CONT'} = sub { $main::please_continue = 1; };
261 $main::please_freeze = 0;
262 $main::please_info = 0;
263 $main::please_continue = 0;
264 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
266 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
267 $ENV{"MR_JOB_ID"} = $job_id;
268 $ENV{"JOB_UUID"} = $job_id;
272 my @jobstep_todo = ();
273 my @jobstep_done = ();
274 my @jobstep_tomerge = ();
275 my $jobstep_tomerge_level = 0;
277 my $squeue_kill_checked;
278 my $output_in_keep = 0;
282 if (defined $Job->{thawedfromkey})
284 thaw ($Job->{thawedfromkey});
288 push @jobstep, { input => $Job->{inputkey},
292 push @jobstep_todo, 0;
297 mkdir ($ENV{"TMPDIR"}."/mrcompute");
298 if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) {
299 # set up a fresh gnupg directory just for this process
300 # TODO: reap abandoned gnupg dirs
301 system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$");
302 mkdir ($ENV{"TMPDIR"}."/mrcompute");
303 mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700);
304 mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!");
306 my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$";
308 open C, ">", $newhomedir."/gpg.conf";
309 print C "always-trust\n";
312 # import secret keys referenced in job spec
314 $hashes =~ s/\'/\'\\\'\'/g;
315 my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`;
317 while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) {
319 while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) {
324 croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out");
327 if ($have_database) {
329 # make sure the job request was signed by all of the secret keys
330 # contained in GPG_KEYS (otherwise, any VM can just copy the
331 # GPG_KEYS hash from an existing mr-job and submit new jobs that can
335 my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`;
336 while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) {
337 $did_not_sign{$1} = 1;
339 my $srfile = "$newhomedir/signedrequest";
340 open SREQ, ">", $srfile;
341 print SREQ $$Job{"signedrequest"};
343 my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`;
345 if ($gpg_v =~ /\nok\n$/s) {
346 while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) {
347 delete $did_not_sign{$1};
352 "Some secret keys provided did not sign this job request:",
353 keys %did_not_sign) . "\n");
357 my $hostname = `hostname`;
360 # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need
361 $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`;
362 $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`;
364 # import all secret keys from my real home dir
365 `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`;
367 # use the new gnupg dir from now on
368 $ENV{"GNUPGHOME"} = $newhomedir;
370 # if I have a secret key for root@{host} or {user}@{host} or
371 # {configured-controller-gpg-uid}, add that as a recipient too so
372 # I'll be able to read frozentokeys etc. later
374 while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) {
377 my $encrypting_to_self = 0;
378 my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname);
379 push @try_these_uids, ($whc->{config}->{controller_gpg_uid})
380 if exists $whc->{config}->{controller_gpg_uid};
381 foreach my $id (@try_these_uids) {
382 if (exists $allkeys{$id}) {
383 $encrypt_to{$id} = 1;
384 $encrypting_to_self = 1;
389 if (!$encrypting_to_self) {
390 croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read");
393 # tell the client library (and child procs and jobsteps) to encrypt using these keys
394 $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to);
395 Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')");
396 $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"});
401 $ENV{"MR_REVISION"} = $Job->{revision};
403 my $git_build_script;
404 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
407 $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision};
411 Log (undef, "Install revision ".$Job->{revision});
412 my $nodelist = join(",", @node);
414 # Clean out mrcompute/work and mrcompute/opt
416 my $cleanpid = fork();
419 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}],
420 ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']);
425 last if $cleanpid == waitpid (-1, WNOHANG);
426 freeze_if_want_freeze ($cleanpid);
427 select (undef, undef, undef, 0.1);
429 Log (undef, "Clean-work-dir exited $?");
431 # Install requested code version
435 my @srunargs = ("srun",
436 "--nodelist=$nodelist",
437 "-D", $ENV{TMPDIR}, "--job-name=$job_id");
439 $ENV{"MR_REVISION"} = $Job->{revision};
440 $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps";
441 $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt";
444 my $treeish = $Job->{revision};
445 my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url");
447 # Create/update our clone of the remote git repo
449 if (!-d $ENV{MR_REVISION_SRCDIR}) {
450 system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0
451 or croak ("git clone $repo failed: exit ".($?>>8));
452 system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false");
454 `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`;
456 # If this looks like a subversion r#, look for it in git-svn commit messages
458 if ($treeish =~ m{^\d{1,4}$}) {
459 my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
461 if ($gitlog =~ /^[a-f0-9]{40}$/) {
463 Log (undef, "Using commit $commit for revision $treeish");
467 # If that didn't work, try asking git to look it up as a tree-ish.
469 if (!defined $commit) {
471 my $cooked_treeish = $treeish;
472 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
473 # Looks like a git branch name -- make sure git knows it's
474 # relative to the remote repo
475 $cooked_treeish = "origin/$treeish";
478 my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`;
480 if ($found =~ /^[0-9a-f]{40}$/s) {
482 if ($commit ne $treeish) {
483 # Make sure we record the real commit id in the database,
484 # frozentokey, logs, etc. -- instead of an abbreviation or a
485 # branch name which can become ambiguous or point to a
486 # different commit in the future.
487 $ENV{"MR_REVISION"} = $commit;
488 $Job->{revision} = $commit;
490 ("update mrjob set revision=? where id=?",
492 $Job->{revision}, $Job->{id});
493 Log (undef, "Using commit $commit for tree-ish $treeish");
498 if (defined $commit) {
499 $ENV{"MR_GIT_COMMIT"} = $commit;
500 $ENV{"MR_GIT_CLONE_URL"} = $repo;
501 @execargs = ("sh", "-c",
502 "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo");
503 open GBS, "<", `echo -n \$(which whjob-checkout-and-build)`
504 or croak ("can't find whjob-checkout-and-build");
506 $git_build_script = <GBS>;
508 $build_script = $git_build_script;
510 elsif ($treeish =~ m{^(\d{1,5})$}) {
511 # Want a subversion r# but couldn't find it in git-svn history -
512 # might as well try using the subversion repo in case it's still
514 $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root");
515 $ENV{"INSTALL_REVISION"} = $Job->{revision};
516 $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish";
517 $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src";
518 @execargs = ("sh", "-c",
519 "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )");
522 croak ("could not figure out commit id for $treeish");
525 my $installpid = fork();
526 if ($installpid == 0)
528 srun (\@srunargs, \@execargs, {}, $build_script);
533 last if $installpid == waitpid (-1, WNOHANG);
534 freeze_if_want_freeze ($installpid);
535 select (undef, undef, undef, 0.1);
537 Log (undef, "Install exited $?");
542 foreach (qw (mrfunction revision nodes stepspernode inputkey))
544 Log (undef, $_ . " " . $Job->{$_});
546 foreach (split (/\n/, $Job->{knobs}))
548 Log (undef, "knob " . $_);
559 my $thisround_succeeded = 0;
560 my $thisround_failed = 0;
561 my $thisround_failed_multiple = 0;
563 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
564 or $a <=> $b } @jobstep_todo;
565 my $level = $jobstep[$jobstep_todo[0]]->{level};
566 Log (undef, "start level $level");
571 my @freeslot = (0..$#slot);
574 my ($id, $input, $attempts);
575 my $progress_is_dirty = 1;
576 my $progress_stats_updated = 0;
578 update_progress_stats();
583 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
585 $main::please_continue = 0;
587 my $id = $jobstep_todo[$todo_ptr];
588 my $Jobstep = $jobstep[$id];
589 if ($Jobstep->{level} != $level)
593 if ($Jobstep->{attempts} > 9)
595 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
600 pipe $reader{$id}, "writer" or croak ($!);
601 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
602 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
604 my $childslot = $freeslot[0];
605 my $childnode = $slot[$childslot]->{node};
606 my $childslotname = join (".",
607 $slot[$childslot]->{node}->{name},
608 $slot[$childslot]->{cpu});
609 my $childpid = fork();
612 $SIG{'INT'} = 'DEFAULT';
613 $SIG{'QUIT'} = 'DEFAULT';
614 $SIG{'TERM'} = 'DEFAULT';
616 foreach (values (%reader))
620 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
621 open(STDOUT,">&writer");
622 open(STDERR,">&writer");
628 delete $ENV{"GNUPGHOME"};
630 $ENV{"MR_INPUT"} = $Jobstep->{input};
631 $ENV{"MR_KNOBS"} = $Job->{knobs};
632 $ENV{"MR_LEVEL"} = $level;
633 $ENV{"MR_FUNCTION"} = $Job->{mrfunction};
634 $ENV{"MR_INPUT0"} = $Job->{inputkey};
635 $ENV{"MR_INPUTKEY"} = $Job->{inputkey};
636 $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
637 $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
638 $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated
639 $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work";
640 $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu};
641 $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
642 $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers);
643 $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain;
644 $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class;
646 $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id;
647 $ENV{"TASK_QSEQUENCE"} = $id;
648 $ENV{"TASK_SEQUENCE"} = $Jobstep->{level};
654 "--nodelist=".$childnode->{name},
655 qw(-n1 -c1 -N1 -D), $ENV{TMPDIR},
656 "--job-name=$job_id.$id.$$",
658 my @execargs = qw(sh);
661 "mkdir -p $ENV{TMPDIR}/mrcompute/revision "
662 ."&& cd $ENV{TMPDIR}/mrcompute ";
663 if ($git_build_script)
665 $script = $git_build_script;
667 "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}";
669 elsif (!$skip_install)
674 ." [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] "
676 ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
677 ." && ./installrevision "
681 if (exists $ENV{GPG_KEYS}) {
683 "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
686 "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
688 my @execargs = ('bash', '-c', $command);
689 srun (\@srunargs, \@execargs, undef, $script);
693 if (!defined $childpid)
700 $proc{$childpid} = { jobstep => $id,
703 jobstepname => "$job_id.$id.$childpid",
705 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
706 $slot[$childslot]->{pid} = $childpid;
708 Log ($id, "child $childpid started on $childslotname");
709 $Jobstep->{attempts} ++;
710 $Jobstep->{starttime} = time;
711 $Jobstep->{node} = $childnode->{name};
712 $Jobstep->{slotindex} = $childslot;
713 delete $Jobstep->{stderr};
714 delete $Jobstep->{output};
715 delete $Jobstep->{finishtime};
717 splice @jobstep_todo, $todo_ptr, 1;
720 $progress_is_dirty = 1;
724 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
726 last THISROUND if $main::please_freeze;
727 if ($main::please_info)
729 $main::please_info = 0;
733 update_progress_stats();
741 update_progress_stats();
742 select (undef, undef, undef, 0.1);
744 elsif (time - $progress_stats_updated >= 30)
746 update_progress_stats();
748 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
749 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
751 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
752 .($thisround_failed+$thisround_succeeded)
753 .") -- giving up on this round";
754 Log (undef, $message);
758 # move slots from freeslot to holdslot (or back to freeslot) if necessary
759 for (my $i=$#freeslot; $i>=0; $i--) {
760 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
761 push @holdslot, (splice @freeslot, $i, 1);
764 for (my $i=$#holdslot; $i>=0; $i--) {
765 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
766 push @freeslot, (splice @holdslot, $i, 1);
770 # give up if no nodes are succeeding
771 if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
772 my $message = "Every node has failed -- giving up on this round";
773 Log (undef, $message);
780 push @freeslot, splice @holdslot;
781 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
784 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
787 goto THISROUND if $main::please_continue;
788 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
793 update_progress_stats();
794 select (undef, undef, undef, 0.1);
795 killem (keys %proc) if $main::please_freeze;
799 update_progress_stats();
800 freeze_if_want_freeze();
803 if (@jobstep_tomerge && !@jobstep_todo)
805 push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0),
806 level => $jobstep_tomerge_level,
808 push @jobstep_todo, $#jobstep;
812 if (!defined $success)
815 $thisround_succeeded == 0 &&
816 ($thisround_failed == 0 || $thisround_failed > 4))
818 my $message = "stop because $thisround_failed tasks failed and none succeeded";
819 Log (undef, $message);
828 goto ONELEVEL if !defined $success;
831 release_allocation();
833 my $key = &collate_output();
834 $success = 0 if !$key;
840 foreach my $hash (split ",", $key)
842 my $keephash = $whc->store_in_keep (hash => $hash,
846 Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr);
849 push @keepkey, $keephash;
851 my $keepkey = join (",", @keepkey);
852 Log (undef, "outputkey+K $keepkey");
853 print "$keepkey\n" if $success;
860 dbh_do ("update mrjob set output=? where id=?", undef,
862 or croak ($dbh->errstr);
864 $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id")
865 or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr);
869 Log (undef, "finish");
871 dbh_do ("update mrjob set finishtime=now(), success=?
872 where id=? and jobmanager_id=?", undef,
873 $success, $job_id, $jobmanager_id)
874 or croak ($dbh->errstr);
881 sub update_progress_stats
883 $progress_stats_updated = time;
884 return if !$progress_is_dirty;
885 my ($todo, $done, $running) = (scalar @jobstep_todo,
886 scalar @jobstep_done,
887 scalar @slot - scalar @freeslot - scalar @holdslot);
889 ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?",
891 $todo, $done, $running, $job_id);
892 Log (undef, "status: $done done, $running running, $todo todo");
893 $progress_is_dirty = 0;
900 my $pid = waitpid (-1, WNOHANG);
901 return 0 if $pid <= 0;
903 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
905 . $slot[$proc{$pid}->{slot}]->{cpu});
906 my $jobstepid = $proc{$pid}->{jobstep};
907 my $elapsed = time - $proc{$pid}->{time};
908 my $Jobstep = $jobstep[$jobstepid];
910 process_stderr_for_output_key ($jobstepid);
913 my $exitinfo = "exit $exitcode";
914 if (!exists $Jobstep->{output})
916 $exitinfo .= " with no output key";
917 $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys;
920 if ($exitcode == 0 && $Jobstep->{node_fail}) {
921 $exitinfo .= " but recording as failure";
925 Log ($jobstepid, "child $pid on $whatslot $exitinfo");
927 if ($exitcode != 0 || $Jobstep->{node_fail})
929 --$Jobstep->{attempts} if $Jobstep->{node_fail};
931 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
933 # Check for signs of a failed or misconfigured node
934 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
935 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
936 # Don't count this against jobstep failure thresholds if this
937 # node is already suspected faulty and srun exited quickly
938 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
940 $Jobstep->{attempts} > 1) {
941 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
942 --$Jobstep->{attempts};
944 ban_node_by_slot($proc{$pid}->{slot});
947 push @jobstep_todo, $jobstepid;
948 Log ($jobstepid, "failure in $elapsed seconds");
952 ++$thisround_succeeded;
953 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
954 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
955 push @jobstep_done, $jobstepid;
956 Log ($jobstepid, "success in $elapsed seconds");
958 $Jobstep->{exitcode} = $exitcode;
959 $Jobstep->{finishtime} = time;
960 process_stderr ($jobstepid, $exitcode == 0);
961 Log ($jobstepid, "output $$Jobstep{output}");
963 close $reader{$jobstepid};
964 delete $reader{$jobstepid};
965 delete $slot[$proc{$pid}->{slot}]->{pid};
966 push @freeslot, $proc{$pid}->{slot};
969 $progress_is_dirty = 1;
976 # return if the kill list was checked <4 seconds ago
977 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
981 $squeue_kill_checked = time;
983 # use killem() on procs whose killtime is reached
986 if (exists $proc{$_}->{killtime}
987 && $proc{$_}->{killtime} <= time)
993 # return if the squeue was checked <60 seconds ago
994 if (defined $squeue_checked && $squeue_checked > time - 60)
998 $squeue_checked = time;
1002 # here is an opportunity to check for mysterious problems with local procs
1006 # get a list of steps still running
1007 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1009 if ($squeue[-1] ne "ok")
1015 # which of my jobsteps are running, according to squeue?
1019 if (/^(\d+)\.(\d+) (\S+)/)
1021 if ($1 eq $ENV{SLURM_JOBID})
1028 # which of my active child procs (>60s old) were not mentioned by squeue?
1029 foreach (keys %proc)
1031 if ($proc{$_}->{time} < time - 60
1032 && !exists $ok{$proc{$_}->{jobstepname}}
1033 && !exists $proc{$_}->{killtime})
1035 # kill this proc if it hasn't exited in 30 seconds
1036 $proc{$_}->{killtime} = time + 30;
1042 sub release_allocation
1046 Log (undef, "release job allocation");
1047 system "scancel $ENV{SLURM_JOBID}";
1055 foreach my $job (keys %reader)
1058 while (0 < sysread ($reader{$job}, $buf, 8192))
1060 print STDERR $buf if $ENV{MR_DEBUG};
1061 $jobstep[$job]->{stderr} .= $buf;
1062 preprocess_stderr ($job);
1063 if (length ($jobstep[$job]->{stderr}) > 16384 &&
1064 $jobstep[$job]->{stderr} !~ /\+\+\+mr/)
1066 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1075 sub process_stderr_for_output_key
1078 while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s)
1080 $jobstep[$job]->{output} = $1;
1081 $jobsteps_must_output_keys = 1;
1086 sub preprocess_stderr
1090 $jobstep[$job]->{stderr_jobsteps} = []
1091 if !exists $jobstep[$job]->{stderr_jobsteps};
1093 $jobstep[$job]->{stderr} =~
1094 s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{
1095 push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1);
1099 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1101 if ($line =~ /\+\+\+mr/) {
1104 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1105 Log ($job, "stderr $line");
1106 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
1108 $main::please_freeze = 1;
1110 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1111 $jobstep[$job]->{node_fail} = 1;
1112 ban_node_by_slot($jobstep[$job]->{slotindex});
1121 my $success = shift;
1122 preprocess_stderr ($job);
1125 Log ($job, "stderr $_");
1126 } split ("\n", $jobstep[$job]->{stderr});
1128 if (!$success || !exists $jobstep[$job]->{stderr_jobsteps})
1130 delete $jobstep[$job]->{stderr_jobsteps};
1134 foreach (@{ $jobstep[$job]->{stderr_jobsteps} })
1136 /^(?:\/(\d+|\*))? (\d+) (.*)/s;
1137 my ($merge, $level, $input) = ($1, $2, $3);
1141 push @jobstep_tomerge, $input;
1142 $jobstep_tomerge_level = $level;
1143 if ($merge !~ /\D/ && @jobstep_tomerge >= $merge)
1145 $newjobref = { input => join ("\n",
1146 splice @jobstep_tomerge, 0, $merge),
1153 $newjobref = { input => $input,
1159 push @jobstep, $newjobref;
1160 push @jobstep_todo, $#jobstep;
1163 delete $jobstep[$job]->{stderr_jobsteps};
1169 Log (undef, "collate");
1170 $whc->write_start (1);
1174 next if !exists $_->{output} || $_->{exitcode} != 0;
1175 my $output = $_->{output};
1176 if ($output !~ /^[0-9a-f]{32}/)
1178 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1179 $whc->write_data ($output);
1181 elsif (@jobstep == 1)
1186 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1188 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1189 $whc->write_data ($outblock);
1193 my $errstr = $whc->errstr;
1194 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1198 $key = $whc->write_finish if !defined $key;
1201 Log (undef, "outputkey $key");
1202 dbh_do ("update mrjob set output=? where id=?", undef,
1204 or Log (undef, "db update failed: ".$DBI::errstr);
1208 Log (undef, "outputkey undef");
1218 my $sig = 2; # SIGINT first
1219 if (exists $proc{$_}->{"sent_$sig"} &&
1220 time - $proc{$_}->{"sent_$sig"} > 4)
1222 $sig = 15; # SIGTERM if SIGINT doesn't work
1224 if (exists $proc{$_}->{"sent_$sig"} &&
1225 time - $proc{$_}->{"sent_$sig"} > 4)
1227 $sig = 9; # SIGKILL if SIGTERM doesn't work
1229 if (!exists $proc{$_}->{"sent_$sig"})
1231 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1233 select (undef, undef, undef, 0.1);
1236 kill $sig, $_; # srun wants two SIGINT to really interrupt
1238 $proc{$_}->{"sent_$sig"} = time;
1239 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1249 vec($bits,fileno($_),1) = 1;
1255 sub Log # ($jobstep_id, $logmessage)
1257 if ($_[1] =~ /\n/) {
1258 for my $line (split (/\n/, $_[1])) {
1263 my $fh = select STDERR; $|=1; select $fh;
1264 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1265 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1268 if ($metastream || -t STDERR) {
1269 my @gmtime = gmtime;
1270 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1271 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1273 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1275 return if !$metastream;
1276 $metastream->write_data ($datetime . " " . $message);
1280 sub reconnect_database
1282 return if !$have_database;
1283 return if ($dbh && $dbh->do ("select now()"));
1286 $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1288 $dbh->{InactiveDestroy} = 1;
1291 warn ($DBI::errstr);
1294 croak ($DBI::errstr) if !$dbh;
1300 return 1 if !$have_database;
1301 my $ret = $dbh->do (@_);
1302 return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1303 reconnect_database();
1304 return $dbh->do (@_);
1310 my ($package, $file, $line) = caller;
1311 my $message = "@_ at $file line $line\n";
1312 Log (undef, $message);
1313 freeze() if @jobstep_todo;
1314 collate_output() if @jobstep_todo;
1316 save_meta() if $metastream;
1323 return if !$have_database || !$dbh;
1325 reconnect_database();
1327 $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
1328 $sth->execute ($jobmanager_id);
1329 $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
1330 $sth->execute ($job_id, $jobmanager_id);
1336 reconnect_database();
1337 my $justcheckpoint = shift; # false if this will be the last meta saved
1338 my $m = $metastream;
1339 $m = $m->copy if $justcheckpoint;
1341 my $key = $m->as_key;
1342 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1343 Log (undef, "meta key is $key");
1344 dbh_do ("update mrjob set metakey=? where id=?",
1350 sub freeze_if_want_freeze
1352 if ($main::please_freeze)
1354 release_allocation();
1357 # kill some srun procs before freeze+stop
1358 map { $proc{$_} = {} } @_;
1361 killem (keys %proc);
1362 select (undef, undef, undef, 0.1);
1364 while (($died = waitpid (-1, WNOHANG)) > 0)
1366 delete $proc{$died};
1381 Log (undef, "freeze");
1383 my $freezer = new Warehouse::Stream (whc => $whc);
1385 $freezer->name (".");
1386 $freezer->write_start ("state.txt");
1388 $freezer->write_data (join ("\n",
1392 $_ . "=" . freezequote($Job->{$_})
1393 } grep { $_ ne "id" } keys %$Job) . "\n\n");
1395 foreach my $Jobstep (@jobstep)
1397 my $str = join ("\n",
1400 $_ . "=" . freezequote ($Jobstep->{$_})
1402 $_ !~ /^stderr|slotindex|node_fail/
1404 $freezer->write_data ($str."\n\n");
1406 if (@jobstep_tomerge)
1408 $freezer->write_data
1409 ("merge $jobstep_tomerge_level "
1410 . freezequote (join ("\n",
1411 map { freezequote ($_) } @jobstep_tomerge))
1415 $freezer->write_finish;
1416 my $frozentokey = $freezer->as_key;
1418 Log (undef, "frozento key is $frozentokey");
1419 dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1420 $frozentokey, $job_id);
1421 my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1422 Log (undef, "frozento+K key is $kfrozentokey");
1423 return $frozentokey;
1430 Log (undef, "thaw from $key");
1435 @jobstep_tomerge = ();
1436 $jobstep_tomerge_level = 0;
1439 my $stream = new Warehouse::Stream ( whc => $whc,
1440 hash => [split (",", $key)] );
1442 while (my $dataref = $stream->read_until (undef, "\n\n"))
1444 if ($$dataref =~ /^job /)
1446 foreach (split ("\n", $$dataref))
1448 my ($k, $v) = split ("=", $_, 2);
1449 $frozenjob->{$k} = freezeunquote ($v);
1454 if ($$dataref =~ /^merge (\d+) (.*)/)
1456 $jobstep_tomerge_level = $1;
1458 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1463 foreach (split ("\n", $$dataref))
1465 my ($k, $v) = split ("=", $_, 2);
1466 $Jobstep->{$k} = freezeunquote ($v) if $k;
1468 $Jobstep->{attempts} = 0;
1469 push @jobstep, $Jobstep;
1471 if ($Jobstep->{exitcode} eq "0")
1473 push @jobstep_done, $#jobstep;
1477 push @jobstep_todo, $#jobstep;
1481 foreach (qw (mrfunction revision inputkey knobs))
1483 $Job->{$_} = $frozenjob->{$_};
1487 set mrfunction=?, revision=?, input0=?, knobs=?
1510 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1517 my $srunargs = shift;
1518 my $execargs = shift;
1519 my $opts = shift || {};
1521 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1522 print STDERR (join (" ",
1523 map { / / ? "'$_'" : $_ }
1528 if (defined $stdin) {
1529 my $child = open STDIN, "-|";
1530 defined $child or die "no fork: $!";
1532 print $stdin or die $!;
1533 close STDOUT or die $!;
1538 return system (@$args) if $opts->{fork};
1541 warn "ENV size is ".length(join(" ",%ENV));
1542 die "exec failed: $!: @$args";
1546 sub ban_node_by_slot {
1547 # Don't start any new jobsteps on this node for 60 seconds
1549 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1550 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");