#!/usr/bin/perl # -*- mode: perl; perl-indent-level: 2; -*- =head1 NAME whjobmanager: Execute job steps, save snapshots as requested, collate output. =head1 SYNOPSIS Obtain job details from database, run tasks on compute nodes (typically invoked by scheduler on cloud controller): whjobmanager jobid Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \ [stepspernode=N] [SOMEKNOB=value] ... =head1 RUNNING JOBS LOCALLY whjobmanager(1p)'s log messages appear on stderr, and are saved in the warehouse at each checkpoint and when the job finishes. If the job succeeds, the job's output locator is printed on stdout. If a job step outputs anything to stderr, it appears in whjobmanager(1p)'s log when the step finishes. While the job is running, the following signals are accepted: =over =item control-C, SIGINT, SIGQUIT Save a checkpoint, terminate any job steps that are running, and stop. =item SIGALRM Save a checkpoint and continue. =back =head1 SEE ALSO whintro(1p), wh(1p) =cut use strict; use DBI; use POSIX ':sys_wait_h'; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Warehouse; use Warehouse::Stream; $ENV{"TMPDIR"} ||= "/tmp"; do '/etc/warehouse/warehouse-server.conf'; my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/; $SIG{'USR1'} = sub { $main::ENV{MR_DEBUG} = 1; }; $SIG{'USR2'} = sub { $main::ENV{MR_DEBUG} = 0; }; my $whc = new Warehouse or croak ("failed to create Warehouse client"); my $metastream = new Warehouse::Stream (whc => $whc); $metastream->clear; $metastream->name ("."); $metastream->write_start ("log.txt"); my $Job = {}; my $job_id; my $dbh; my $sth; if ($have_database) { ($job_id) = @ARGV; $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); croak ($DBI::errstr) if !$dbh; $dbh->{InactiveDestroy} = 1; $sth = $dbh->prepare ("select * from mrjob where id=?"); $sth->execute ($job_id) or croak ($dbh->errstr); $Job = $sth->fetchrow_hashref or croak ($sth->errstr); } else { my %knob; foreach (@ARGV) { if (/([a-z].*?)=(.*)/) { $Job->{$1} = $2; } elsif (/(.*?)=(.*)/) { $knob{$1} = $2; } } $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob); if (!$Job->{thawedfromkey}) { map { croak ("No $_ specified") unless $Job->{$_} } qw(mrfunction revision inputkey); } if (!defined $Job->{id}) { chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`)); } $job_id = $Job->{id}; } $Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey}; delete $Job->{input0}; my $max_ncpus; map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs}); $max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/; $max_ncpus = $$Job{stepspernode} if $$Job{stepspernode}; my $maxstepspernode; Log (undef, "check slurm allocation"); my @slot; my @node; # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)") my @sinfo; if (!$have_slurm) { my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1; push @sinfo, "$localcpus localhost"; } if (exists $ENV{SLURM_NODELIST}) { push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`; } foreach (@sinfo) { my ($ncpus, $slurm_nodelist) = split; $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0; $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus; my @nodelist; while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) { my $nodelist = $1; if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/) { my $ranges = $1; foreach (split (",", $ranges)) { my ($a, $b); if (/(\d+)-(\d+)/) { $a = $1; $b = $2; } else { $a = $_; $b = $_; } push @nodelist, map { my $n = $nodelist; $n =~ s/\[[-,\d]+\]/$_/; $n; } ($a..$b); } } else { push @nodelist, $nodelist; } } foreach my $nodename (@nodelist) { Log (undef, "node $nodename - $ncpus slots"); my $node = { name => $nodename, ncpus => $ncpus, losing_streak => 0, hold_until => 0 }; foreach my $cpu (1..$ncpus) { push @slot, { node => $node, cpu => $cpu }; } } push @node, @nodelist; } # Ensure that we get one jobstep running on each allocated node before # we start overloading nodes with concurrent steps @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; my $jobmanager_id; if ($have_database) { # Claim this job, and make sure nobody else does $sth = $dbh->prepare ("insert into mrjobmanager (pid, revision, starttime) values (?, ?, now())"); my $rev = q/$Revision$/; $rev =~ /\d+/; $sth->execute ($$, +$&) or croak ($dbh->errstr); $sth = $dbh->prepare ("select last_insert_id()"); $sth->execute or croak ($dbh->errstr); ($jobmanager_id) = $sth->fetchrow_array; $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now() where id=? and jobmanager_id is null"); $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr); $sth = $dbh->prepare ("select jobmanager_id from mrjob where id=?"); $sth->execute ($job_id) or croak ($dbh->errstr); my ($check_jobmanager_id) = $sth->fetchrow_array; if ($check_jobmanager_id != $jobmanager_id) { # race condition - another job manager proc stole the job Log (undef, "job taken by jobmanager id $check_jobmanager_id"); exit (1); } } Log (undef, "start"); $SIG{'INT'} = sub { $main::please_freeze = 1; }; $SIG{'QUIT'} = sub { $main::please_freeze = 1; }; $SIG{'TERM'} = \&croak; $SIG{'TSTP'} = sub { $main::please_freeze = 1; }; $SIG{'ALRM'} = sub { $main::please_info = 1; }; $SIG{'CONT'} = sub { $main::please_continue = 1; }; $main::please_freeze = 0; $main::please_info = 0; $main::please_continue = 0; my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); $ENV{"MR_JOB_ID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; my @jobstep; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked; my $squeue_kill_checked; my $output_in_keep = 0; if (defined $Job->{thawedfromkey}) { thaw ($Job->{thawedfromkey}); } else { push @jobstep, { input => $Job->{inputkey}, level => 0, attempts => 0, }; push @jobstep_todo, 0; } mkdir ($ENV{"TMPDIR"}."/mrcompute"); if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) { # set up a fresh gnupg directory just for this process # TODO: reap abandoned gnupg dirs system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"); mkdir ($ENV{"TMPDIR"}."/mrcompute"); mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700); mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!"); my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"; open C, ">", $newhomedir."/gpg.conf"; print C "always-trust\n"; close C; # import secret keys referenced in job spec my $hashes = $1; $hashes =~ s/\'/\'\\\'\'/g; my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`; my %encrypt_to; while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) { my $keynum = $1; while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) { $encrypt_to{$1} = 1; } } if (!%encrypt_to) { croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out"); } if ($have_database) { # make sure the job request was signed by all of the secret keys # contained in GPG_KEYS (otherwise, any VM can just copy the # GPG_KEYS hash from an existing mr-job and submit new jobs that can # read private data) my %did_not_sign; my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`; while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) { $did_not_sign{$1} = 1; } my $srfile = "$newhomedir/signedrequest"; open SREQ, ">", $srfile; print SREQ $$Job{"signedrequest"}; close SREQ; my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`; unlink $srfile; if ($gpg_v =~ /\nok\n$/s) { while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) { delete $did_not_sign{$1}; } } if (%did_not_sign) { croak (join ("\n", "Some secret keys provided did not sign this job request:", keys %did_not_sign) . "\n"); } } my $hostname = `hostname`; chomp ($hostname); # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`; $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`; # import all secret keys from my real home dir `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`; # use the new gnupg dir from now on $ENV{"GNUPGHOME"} = $newhomedir; # if I have a secret key for root@{host} or {user}@{host} or # {configured-controller-gpg-uid}, add that as a recipient too so # I'll be able to read frozentokeys etc. later my %allkeys; while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) { $allkeys{$1} = 1; } my $encrypting_to_self = 0; my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname); push @try_these_uids, ($whc->{config}->{controller_gpg_uid}) if exists $whc->{config}->{controller_gpg_uid}; foreach my $id (@try_these_uids) { if (exists $allkeys{$id}) { $encrypt_to{$id} = 1; $encrypting_to_self = 1; last; } } if (!$encrypting_to_self) { croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read"); } # tell the client library (and child procs and jobsteps) to encrypt using these keys $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to); Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')"); $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"}); } $ENV{"MR_REVISION"} = $Job->{revision}; my $git_build_script; my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/}); if ($skip_install) { $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision}; } else { Log (undef, "Install revision ".$Job->{revision}); my $nodelist = join(",", @node); # Clean out mrcompute/work and mrcompute/opt my $cleanpid = fork(); if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}], ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']); exit (1); } while (1) { last if $cleanpid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($cleanpid); select (undef, undef, undef, 0.1); } Log (undef, "Clean-work-dir exited $?"); # Install requested code version my $build_script; my @execargs; my @srunargs = ("srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}, "--job-name=$job_id"); $ENV{"MR_REVISION"} = $Job->{revision}; $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps"; $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt"; my $commit; my $treeish = $Job->{revision}; my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url"); # Create/update our clone of the remote git repo if (!-d $ENV{MR_REVISION_SRCDIR}) { system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0 or croak ("git clone $repo failed: exit ".($?>>8)); system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false"); } `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`; # If this looks like a subversion r#, look for it in git-svn commit messages if ($treeish =~ m{^\d{1,4}$}) { my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`; chomp $gitlog; if ($gitlog =~ /^[a-f0-9]{40}$/) { $commit = $gitlog; Log (undef, "Using commit $commit for revision $treeish"); } } # If that didn't work, try asking git to look it up as a tree-ish. if (!defined $commit) { my $cooked_treeish = $treeish; if ($treeish !~ m{^[0-9a-f]{5,}$}) { # Looks like a git branch name -- make sure git knows it's # relative to the remote repo $cooked_treeish = "origin/$treeish"; } my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`; chomp $found; if ($found =~ /^[0-9a-f]{40}$/s) { $commit = $found; if ($commit ne $treeish) { # Make sure we record the real commit id in the database, # frozentokey, logs, etc. -- instead of an abbreviation or a # branch name which can become ambiguous or point to a # different commit in the future. $ENV{"MR_REVISION"} = $commit; $Job->{revision} = $commit; dbh_do ("update mrjob set revision=? where id=?", undef, $Job->{revision}, $Job->{id}); Log (undef, "Using commit $commit for tree-ish $treeish"); } } } if (defined $commit) { $ENV{"MR_GIT_COMMIT"} = $commit; $ENV{"MR_GIT_CLONE_URL"} = $repo; @execargs = ("sh", "-c", "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo"); open GBS, "<", `echo -n \$(which whjob-checkout-and-build)` or croak ("can't find whjob-checkout-and-build"); local $/ = undef; $git_build_script = ; close GBS; $build_script = $git_build_script; } elsif ($treeish =~ m{^(\d{1,5})$}) { # Want a subversion r# but couldn't find it in git-svn history - # might as well try using the subversion repo in case it's still # there. $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root"); $ENV{"INSTALL_REVISION"} = $Job->{revision}; $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish"; $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src"; @execargs = ("sh", "-c", "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )"); } else { croak ("could not figure out commit id for $treeish"); } my $installpid = fork(); if ($installpid == 0) { srun (\@srunargs, \@execargs, {}, $build_script); exit (1); } while (1) { last if $installpid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($installpid); select (undef, undef, undef, 0.1); } Log (undef, "Install exited $?"); } foreach (qw (mrfunction revision nodes stepspernode inputkey)) { Log (undef, $_ . " " . $Job->{$_}); } foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } my $success; ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; Log (undef, "start level $level"); my %proc; my @freeslot = (0..$#slot); my @holdslot; my %reader; my ($id, $input, $attempts); my $progress_is_dirty = 1; my $progress_stats_updated = 0; update_progress_stats(); THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { $main::please_continue = 0; my $id = $jobstep_todo[$todo_ptr]; my $Jobstep = $jobstep[$id]; if ($Jobstep->{level} != $level) { next; } if ($Jobstep->{attempts} > 9) { Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up"); $success = 0; last THISROUND; } pipe $reader{$id}, "writer" or croak ($!); my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; my $childslotname = join (".", $slot[$childslot]->{node}->{name}, $slot[$childslot]->{cpu}); my $childpid = fork(); if ($childpid == 0) { $SIG{'INT'} = 'DEFAULT'; $SIG{'QUIT'} = 'DEFAULT'; $SIG{'TERM'} = 'DEFAULT'; foreach (values (%reader)) { close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec open(STDOUT,">&writer"); open(STDERR,">&writer"); undef $dbh; undef $sth; delete $ENV{"GNUPGHOME"}; $ENV{"MR_ID"} = $id; $ENV{"MR_INPUT"} = $Jobstep->{input}; $ENV{"MR_KNOBS"} = $Job->{knobs}; $ENV{"MR_LEVEL"} = $level; $ENV{"MR_FUNCTION"} = $Job->{mrfunction}; $ENV{"MR_INPUT0"} = $Job->{inputkey}; $ENV{"MR_INPUTKEY"} = $Job->{inputkey}; $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work"; $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu}; $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers); $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain; $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class; $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id; $ENV{"TASK_QSEQUENCE"} = $id; $ENV{"TASK_SEQUENCE"} = $Jobstep->{level}; $ENV{"GZIP"} = "-n"; my @srunargs = ( "srun", "--nodelist=".$childnode->{name}, qw(-n1 -c1 -N1 -D), $ENV{TMPDIR}, "--job-name=$job_id.$id.$$", ); my @execargs = qw(sh); my $script = ""; my $command = "mkdir -p $ENV{TMPDIR}/mrcompute/revision " ."&& cd $ENV{TMPDIR}/mrcompute "; if ($git_build_script) { $script = $git_build_script; $command .= "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}"; } elsif (!$skip_install) { $command .= "&& " ."( " ." [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] " ."|| " ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' " ." && ./installrevision " ." ) " .") "; } if (exists $ENV{GPG_KEYS}) { $command .= "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; } else { $command .= "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; } my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $script); exit (1); } close("writer"); if (!defined $childpid) { close $reader{$id}; delete $reader{$id}; next; } shift @freeslot; $proc{$childpid} = { jobstep => $id, time => time, slot => $childslot, jobstepname => "$job_id.$id.$childpid", }; croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; $slot[$childslot]->{pid} = $childpid; Log ($id, "child $childpid started on $childslotname"); $Jobstep->{attempts} ++; $Jobstep->{starttime} = time; $Jobstep->{node} = $childnode->{name}; $Jobstep->{slotindex} = $childslot; delete $Jobstep->{stderr}; delete $Jobstep->{output}; delete $Jobstep->{finishtime}; splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; $progress_is_dirty = 1; while (!@freeslot || (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze; if ($main::please_info) { $main::please_info = 0; freeze(); collate_output(); save_meta(1); update_progress_stats(); } my $gotsome = readfrompipes () + reapchildren (); if (!$gotsome) { check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); } elsif (time - $progress_stats_updated >= 30) { update_progress_stats(); } if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { my $message = "Repeated failure rate too high ($thisround_failed_multiple/" .($thisround_failed+$thisround_succeeded) .") -- giving up on this round"; Log (undef, $message); last THISROUND; } # move slots from freeslot to holdslot (or back to freeslot) if necessary for (my $i=$#freeslot; $i>=0; $i--) { if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) { push @holdslot, (splice @freeslot, $i, 1); } } for (my $i=$#holdslot; $i>=0; $i--) { if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) { push @freeslot, (splice @holdslot, $i, 1); } } # give up if no nodes are succeeding if (!grep { $_->{node}->{losing_streak} == 0 } @slot) { my $message = "Every node has failed -- giving up on this round"; Log (undef, $message); last THISROUND; } } } push @freeslot, splice @holdslot; map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot); Log (undef, "wait for last ".(scalar keys %proc)." children to finish"); while (%proc) { goto THISROUND if $main::please_continue; $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; readfrompipes (); if (!reapchildren()) { check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); killem (keys %proc) if $main::please_freeze; } } update_progress_stats(); freeze_if_want_freeze(); if (@jobstep_tomerge && !@jobstep_todo) { push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0), level => $jobstep_tomerge_level, attempts => 0 }; push @jobstep_todo, $#jobstep; } if (!defined $success) { if (@jobstep_todo && $thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $success = 0; } if (!@jobstep_todo) { $success = 1; } } goto ONELEVEL if !defined $success; release_allocation(); freeze(); my $key = &collate_output(); $success = 0 if !$key; if ($key) { my @keepkey; foreach my $hash (split ",", $key) { my $keephash = $whc->store_in_keep (hash => $hash, nnodes => 3); if (!$keephash) { Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr); $keephash = $hash; } push @keepkey, $keephash; } my $keepkey = join (",", @keepkey); Log (undef, "outputkey+K $keepkey"); print "$keepkey\n" if $success; if ($output_in_keep) { $key = $keepkey; } dbh_do ("update mrjob set output=? where id=?", undef, $key, $job_id) or croak ($dbh->errstr); $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id") or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr); } Log (undef, "finish"); dbh_do ("update mrjob set finishtime=now(), success=? where id=? and jobmanager_id=?", undef, $success, $job_id, $jobmanager_id) or croak ($dbh->errstr); save_meta(); exit 0; sub update_progress_stats { $progress_stats_updated = time; return if !$progress_is_dirty; my ($todo, $done, $running) = (scalar @jobstep_todo, scalar @jobstep_done, scalar @slot - scalar @freeslot - scalar @holdslot); dbh_do ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?", undef, $todo, $done, $running, $job_id); Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } sub reapchildren { my $pid = waitpid (-1, WNOHANG); return 0 if $pid <= 0; my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} . "." . $slot[$proc{$pid}->{slot}]->{cpu}); my $jobstepid = $proc{$pid}->{jobstep}; my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepid]; process_stderr_for_output_key ($jobstepid); my $exitcode = $?; my $exitinfo = "exit $exitcode"; if (!exists $Jobstep->{output}) { $exitinfo .= " with no output key"; $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys; } if ($exitcode == 0 && $Jobstep->{node_fail}) { $exitinfo .= " but recording as failure"; $exitcode = -1; } Log ($jobstepid, "child $pid on $whatslot $exitinfo"); if ($exitcode != 0 || $Jobstep->{node_fail}) { --$Jobstep->{attempts} if $Jobstep->{node_fail}; ++$thisround_failed; ++$thisround_failed_multiple if $Jobstep->{attempts} > 1; # Check for signs of a failed or misconfigured node if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { # Don't count this against jobstep failure thresholds if this # node is already suspected faulty and srun exited quickly if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && $elapsed < 5 && $Jobstep->{attempts} > 1) { Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts"); --$Jobstep->{attempts}; } ban_node_by_slot($proc{$pid}->{slot}); } push @jobstep_todo, $jobstepid; Log ($jobstepid, "failure in $elapsed seconds"); } else { ++$thisround_succeeded; $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; push @jobstep_done, $jobstepid; Log ($jobstepid, "success in $elapsed seconds"); } $Jobstep->{exitcode} = $exitcode; $Jobstep->{finishtime} = time; process_stderr ($jobstepid, $exitcode == 0); Log ($jobstepid, "output $$Jobstep{output}"); close $reader{$jobstepid}; delete $reader{$jobstepid}; delete $slot[$proc{$pid}->{slot}]->{pid}; push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; $progress_is_dirty = 1; 1; } sub check_squeue { # return if the kill list was checked <4 seconds ago if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) { return; } $squeue_kill_checked = time; # use killem() on procs whose killtime is reached for (keys %proc) { if (exists $proc{$_}->{killtime} && $proc{$_}->{killtime} <= time) { killem ($_); } } # return if the squeue was checked <60 seconds ago if (defined $squeue_checked && $squeue_checked > time - 60) { return; } $squeue_checked = time; if (!$have_slurm) { # here is an opportunity to check for mysterious problems with local procs return; } # get a list of steps still running my @squeue = `squeue -s -h -o '%i %j' && echo ok`; chop @squeue; if ($squeue[-1] ne "ok") { return; } pop @squeue; # which of my jobsteps are running, according to squeue? my %ok; foreach (@squeue) { if (/^(\d+)\.(\d+) (\S+)/) { if ($1 eq $ENV{SLURM_JOBID}) { $ok{$3} = 1; } } } # which of my active child procs (>60s old) were not mentioned by squeue? foreach (keys %proc) { if ($proc{$_}->{time} < time - 60 && !exists $ok{$proc{$_}->{jobstepname}} && !exists $proc{$_}->{killtime}) { # kill this proc if it hasn't exited in 30 seconds $proc{$_}->{killtime} = time + 30; } } } sub release_allocation { if ($have_slurm) { Log (undef, "release job allocation"); system "scancel $ENV{SLURM_JOBID}"; } } sub readfrompipes { my $gotsome = 0; foreach my $job (keys %reader) { my $buf; while (0 < sysread ($reader{$job}, $buf, 8192)) { print STDERR $buf if $ENV{MR_DEBUG}; $jobstep[$job]->{stderr} .= $buf; preprocess_stderr ($job); if (length ($jobstep[$job]->{stderr}) > 16384 && $jobstep[$job]->{stderr} !~ /\+\+\+mr/) { substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; } $gotsome = 1; } } return $gotsome; } sub process_stderr_for_output_key { my $job = shift; while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s) { $jobstep[$job]->{output} = $1; $jobsteps_must_output_keys = 1; } } sub preprocess_stderr { my $job = shift; $jobstep[$job]->{stderr_jobsteps} = [] if !exists $jobstep[$job]->{stderr_jobsteps}; $jobstep[$job]->{stderr} =~ s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{ push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1); ""; }gse; while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { my $line = $1; if ($line =~ /\+\+\+mr/) { last; } substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; Log ($job, "stderr $line"); if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) { # whoa. $main::please_freeze = 1; } elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) { $jobstep[$job]->{node_fail} = 1; ban_node_by_slot($jobstep[$job]->{slotindex}); } } } sub process_stderr { my $job = shift; my $success = shift; preprocess_stderr ($job); map { Log ($job, "stderr $_"); } split ("\n", $jobstep[$job]->{stderr}); if (!$success || !exists $jobstep[$job]->{stderr_jobsteps}) { delete $jobstep[$job]->{stderr_jobsteps}; return; } foreach (@{ $jobstep[$job]->{stderr_jobsteps} }) { /^(?:\/(\d+|\*))? (\d+) (.*)/s; my ($merge, $level, $input) = ($1, $2, $3); my $newjobref; if ($merge) { push @jobstep_tomerge, $input; $jobstep_tomerge_level = $level; if ($merge !~ /\D/ && @jobstep_tomerge >= $merge) { $newjobref = { input => join ("\n", splice @jobstep_tomerge, 0, $merge), level => $level, attempts => 0 }; } } else { $newjobref = { input => $input, level => $level, attempts => 0 }; } if ($newjobref) { push @jobstep, $newjobref; push @jobstep_todo, $#jobstep; } } delete $jobstep[$job]->{stderr_jobsteps}; } sub collate_output { Log (undef, "collate"); $whc->write_start (1); my $key; for (@jobstep) { next if !exists $_->{output} || $_->{exitcode} != 0; my $output = $_->{output}; if ($output !~ /^[0-9a-f]{32}/) { $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; $whc->write_data ($output); } elsif (@jobstep == 1) { $key = $output; $whc->write_finish; } elsif (defined (my $outblock = $whc->fetch_block ($output))) { $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; $whc->write_data ($outblock); } else { my $errstr = $whc->errstr; $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n"); $success = 0; } } $key = $whc->write_finish if !defined $key; if ($key) { Log (undef, "outputkey $key"); dbh_do ("update mrjob set output=? where id=?", undef, $key, $job_id) or Log (undef, "db update failed: ".$DBI::errstr); } else { Log (undef, "outputkey undef"); } return $key; } sub killem { foreach (@_) { my $sig = 2; # SIGINT first if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 15; # SIGTERM if SIGINT doesn't work } if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 9; # SIGKILL if SIGTERM doesn't work } if (!exists $proc{$_}->{"sent_$sig"}) { Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); kill $sig, $_; select (undef, undef, undef, 0.1); if ($sig == 2) { kill $sig, $_; # srun wants two SIGINT to really interrupt } $proc{$_}->{"sent_$sig"} = time; $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"}; } } } sub fhbits { my($bits); for (@_) { vec($bits,fileno($_),1) = 1; } $bits; } sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { for my $line (split (/\n/, $_[1])) { Log ($_[0], $line); } return; } my $fh = select STDERR; $|=1; select $fh; my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; if ($metastream || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); return if !$metastream; $metastream->write_data ($datetime . " " . $message); } sub reconnect_database { return if !$have_database; return if ($dbh && $dbh->do ("select now()")); for (1..16) { $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); if ($dbh) { $dbh->{InactiveDestroy} = 1; return; } warn ($DBI::errstr); sleep $_; } croak ($DBI::errstr) if !$dbh; } sub dbh_do { return 1 if !$have_database; my $ret = $dbh->do (@_); return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/); reconnect_database(); return $dbh->do (@_); } sub croak { my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); freeze() if @jobstep_todo; collate_output() if @jobstep_todo; cleanup(); save_meta() if $metastream; die; } sub cleanup { return if !$have_database || !$dbh; reconnect_database(); my $sth; $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?"); $sth->execute ($jobmanager_id); $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null"); $sth->execute ($job_id, $jobmanager_id); } sub save_meta { reconnect_database(); my $justcheckpoint = shift; # false if this will be the last meta saved my $m = $metastream; $m = $m->copy if $justcheckpoint; $m->write_finish; my $key = $m->as_key; undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it Log (undef, "meta key is $key"); dbh_do ("update mrjob set metakey=? where id=?", undef, $key, $job_id); } sub freeze_if_want_freeze { if ($main::please_freeze) { release_allocation(); if (@_) { # kill some srun procs before freeze+stop map { $proc{$_} = {} } @_; while (%proc) { killem (keys %proc); select (undef, undef, undef, 0.1); my $died; while (($died = waitpid (-1, WNOHANG)) > 0) { delete $proc{$died}; } } } freeze(); collate_output(); cleanup(); save_meta(); exit 0; } } sub freeze { Log (undef, "freeze"); my $freezer = new Warehouse::Stream (whc => $whc); $freezer->clear; $freezer->name ("."); $freezer->write_start ("state.txt"); $freezer->write_data (join ("\n", "job $Job->{id}", map { $_ . "=" . freezequote($Job->{$_}) } grep { $_ ne "id" } keys %$Job) . "\n\n"); foreach my $Jobstep (@jobstep) { my $str = join ("\n", map { $_ . "=" . freezequote ($Jobstep->{$_}) } grep { $_ !~ /^stderr|slotindex|node_fail/ } keys %$Jobstep); $freezer->write_data ($str."\n\n"); } if (@jobstep_tomerge) { $freezer->write_data ("merge $jobstep_tomerge_level " . freezequote (join ("\n", map { freezequote ($_) } @jobstep_tomerge)) . "\n\n"); } $freezer->write_finish; my $frozentokey = $freezer->as_key; undef $freezer; Log (undef, "frozento key is $frozentokey"); dbh_do ("update mrjob set frozentokey=? where id=?", undef, $frozentokey, $job_id); my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3); Log (undef, "frozento+K key is $kfrozentokey"); return $frozentokey; } sub thaw { my $key = shift; Log (undef, "thaw from $key"); @jobstep = (); @jobstep_done = (); @jobstep_todo = (); @jobstep_tomerge = (); $jobstep_tomerge_level = 0; my $frozenjob = {}; my $stream = new Warehouse::Stream ( whc => $whc, hash => [split (",", $key)] ); $stream->rewind; while (my $dataref = $stream->read_until (undef, "\n\n")) { if ($$dataref =~ /^job /) { foreach (split ("\n", $$dataref)) { my ($k, $v) = split ("=", $_, 2); $frozenjob->{$k} = freezeunquote ($v); } next; } if ($$dataref =~ /^merge (\d+) (.*)/) { $jobstep_tomerge_level = $1; @jobstep_tomerge = map { freezeunquote ($_) } split ("\n", freezeunquote($2)); next; } my $Jobstep = { }; foreach (split ("\n", $$dataref)) { my ($k, $v) = split ("=", $_, 2); $Jobstep->{$k} = freezeunquote ($v) if $k; } $Jobstep->{attempts} = 0; push @jobstep, $Jobstep; if ($Jobstep->{exitcode} eq "0") { push @jobstep_done, $#jobstep; } else { push @jobstep_todo, $#jobstep; } } foreach (qw (mrfunction revision inputkey knobs)) { $Job->{$_} = $frozenjob->{$_}; } dbh_do ("update mrjob set mrfunction=?, revision=?, input0=?, knobs=? where id=?", undef, $Job->{mrfunction}, $Job->{revision}, $Job->{inputkey}, $Job->{knobs}, $Job->{id}, ); } sub freezequote { my $s = shift; $s =~ s/\\/\\\\/g; $s =~ s/\n/\\n/g; return $s; } sub freezeunquote { my $s = shift; $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge; return $s; } sub srun { my $srunargs = shift; my $execargs = shift; my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; print STDERR (join (" ", map { / / ? "'$_'" : $_ } (@$args)), "\n") if $ENV{MR_DEBUG}; if (defined $stdin) { my $child = open STDIN, "-|"; defined $child or die "no fork: $!"; if ($child == 0) { print $stdin or die $!; close STDOUT or die $!; exit 0; } } return system (@$args) if $opts->{fork}; exec @$args; warn "ENV size is ".length(join(" ",%ENV)); die "exec failed: $!: @$args"; } sub ban_node_by_slot { # Don't start any new jobsteps on this node for 60 seconds my $slotid = shift; $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); }