From 9161bdf86199b318267852f472c69bc13e6267aa Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 14 May 2013 12:52:39 -0700 Subject: [PATCH] copy whjobmanager from warehouse-apps --- services/crunch/crunch-job | 1551 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1551 insertions(+) create mode 100755 services/crunch/crunch-job diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job new file mode 100755 index 0000000000..e884a449c4 --- /dev/null +++ b/services/crunch/crunch-job @@ -0,0 +1,1551 @@ +#!/usr/bin/perl +# -*- mode: perl; perl-indent-level: 2; -*- + +=head1 NAME + +whjobmanager: Execute job steps, save snapshots as requested, collate output. + +=head1 SYNOPSIS + +Obtain job details from database, run tasks on compute nodes +(typically invoked by scheduler on cloud controller): + + whjobmanager jobid + +Obtain job details from command line, run tasks on local machine +(typically invoked by application or developer on VM): + + whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \ + [stepspernode=N] [SOMEKNOB=value] ... + +=head1 RUNNING JOBS LOCALLY + +whjobmanager(1p)'s log messages appear on stderr, and are saved in the +warehouse at each checkpoint and when the job finishes. + +If the job succeeds, the job's output locator is printed on stdout. + +If a job step outputs anything to stderr, it appears in +whjobmanager(1p)'s log when the step finishes. + +While the job is running, the following signals are accepted: + +=over + +=item control-C, SIGINT, SIGQUIT + +Save a checkpoint, terminate any job steps that are running, and stop. + +=item SIGALRM + +Save a checkpoint and continue. + +=back + +=head1 SEE ALSO + +whintro(1p), wh(1p) + +=cut + + +use strict; +use DBI; +use POSIX ':sys_wait_h'; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use Warehouse; +use Warehouse::Stream; + +$ENV{"TMPDIR"} ||= "/tmp"; + +do '/etc/warehouse/warehouse-server.conf'; + +my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; +my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/; + + +$SIG{'USR1'} = sub +{ + $main::ENV{MR_DEBUG} = 1; +}; +$SIG{'USR2'} = sub +{ + $main::ENV{MR_DEBUG} = 0; +}; + + + +my $whc = new Warehouse or croak ("failed to create Warehouse client"); +my $metastream = new Warehouse::Stream (whc => $whc); +$metastream->clear; +$metastream->name ("."); +$metastream->write_start ("log.txt"); + + + +my $Job = {}; +my $job_id; +my $dbh; +my $sth; +if ($have_database) +{ + ($job_id) = @ARGV; + + $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); + croak ($DBI::errstr) if !$dbh; + $dbh->{InactiveDestroy} = 1; + + $sth = $dbh->prepare ("select * from mrjob where id=?"); + $sth->execute ($job_id) or croak ($dbh->errstr); + $Job = $sth->fetchrow_hashref or croak ($sth->errstr); +} + +else +{ + my %knob; + foreach (@ARGV) + { + if (/([a-z].*?)=(.*)/) { + $Job->{$1} = $2; + } elsif (/(.*?)=(.*)/) { + $knob{$1} = $2; + } + } + $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob); + + if (!$Job->{thawedfromkey}) + { + map { croak ("No $_ specified") unless $Job->{$_} } + qw(mrfunction revision inputkey); + } + + if (!defined $Job->{id}) { + chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`)); + } + $job_id = $Job->{id}; +} + + + +$Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey}; +delete $Job->{input0}; + + + +my $max_ncpus; +map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs}); +$max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/; +$max_ncpus = $$Job{stepspernode} if $$Job{stepspernode}; +my $maxstepspernode; + + + +Log (undef, "check slurm allocation"); +my @slot; +my @node; +# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)") +my @sinfo; +if (!$have_slurm) +{ + my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1; + push @sinfo, "$localcpus localhost"; +} +if (exists $ENV{SLURM_NODELIST}) +{ + push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`; +} +foreach (@sinfo) +{ + my ($ncpus, $slurm_nodelist) = split; + $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0; + $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus; + + my @nodelist; + while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) + { + my $nodelist = $1; + if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/) + { + my $ranges = $1; + foreach (split (",", $ranges)) + { + my ($a, $b); + if (/(\d+)-(\d+)/) + { + $a = $1; + $b = $2; + } + else + { + $a = $_; + $b = $_; + } + push @nodelist, map { + my $n = $nodelist; + $n =~ s/\[[-,\d]+\]/$_/; + $n; + } ($a..$b); + } + } + else + { + push @nodelist, $nodelist; + } + } + foreach my $nodename (@nodelist) + { + Log (undef, "node $nodename - $ncpus slots"); + my $node = { name => $nodename, + ncpus => $ncpus, + losing_streak => 0, + hold_until => 0 }; + foreach my $cpu (1..$ncpus) + { + push @slot, { node => $node, + cpu => $cpu }; + } + } + push @node, @nodelist; +} + + + +# Ensure that we get one jobstep running on each allocated node before +# we start overloading nodes with concurrent steps + +@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; + + + +my $jobmanager_id; +if ($have_database) +{ + # Claim this job, and make sure nobody else does + + $sth = $dbh->prepare ("insert into mrjobmanager + (pid, revision, starttime) + values (?, ?, now())"); + my $rev = q/$Revision$/; + $rev =~ /\d+/; + $sth->execute ($$, +$&) or croak ($dbh->errstr); + + $sth = $dbh->prepare ("select last_insert_id()"); + $sth->execute or croak ($dbh->errstr); + ($jobmanager_id) = $sth->fetchrow_array; + + $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now() + where id=? and jobmanager_id is null"); + $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr); + + $sth = $dbh->prepare ("select jobmanager_id from mrjob + where id=?"); + $sth->execute ($job_id) or croak ($dbh->errstr); + my ($check_jobmanager_id) = $sth->fetchrow_array; + if ($check_jobmanager_id != $jobmanager_id) + { + # race condition - another job manager proc stole the job + Log (undef, + "job taken by jobmanager id $check_jobmanager_id"); + exit (1); + } +} + + +Log (undef, "start"); +$SIG{'INT'} = sub { $main::please_freeze = 1; }; +$SIG{'QUIT'} = sub { $main::please_freeze = 1; }; +$SIG{'TERM'} = \&croak; +$SIG{'TSTP'} = sub { $main::please_freeze = 1; }; +$SIG{'ALRM'} = sub { $main::please_info = 1; }; +$SIG{'CONT'} = sub { $main::please_continue = 1; }; +$main::please_freeze = 0; +$main::please_info = 0; +$main::please_continue = 0; +my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key + +grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); +$ENV{"MR_JOB_ID"} = $job_id; +$ENV{"JOB_UUID"} = $job_id; + + +my @jobstep; +my @jobstep_todo = (); +my @jobstep_done = (); +my @jobstep_tomerge = (); +my $jobstep_tomerge_level = 0; +my $squeue_checked; +my $squeue_kill_checked; +my $output_in_keep = 0; + + + +if (defined $Job->{thawedfromkey}) +{ + thaw ($Job->{thawedfromkey}); +} +else +{ + push @jobstep, { input => $Job->{inputkey}, + level => 0, + attempts => 0, + }; + push @jobstep_todo, 0; +} + + + +mkdir ($ENV{"TMPDIR"}."/mrcompute"); +if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) { + # set up a fresh gnupg directory just for this process + # TODO: reap abandoned gnupg dirs + system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"); + mkdir ($ENV{"TMPDIR"}."/mrcompute"); + mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700); + mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!"); + + my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"; + + open C, ">", $newhomedir."/gpg.conf"; + print C "always-trust\n"; + close C; + + # import secret keys referenced in job spec + my $hashes = $1; + $hashes =~ s/\'/\'\\\'\'/g; + my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`; + my %encrypt_to; + while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) { + my $keynum = $1; + while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) { + $encrypt_to{$1} = 1; + } + } + if (!%encrypt_to) { + croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out"); + } + + if ($have_database) { + + # make sure the job request was signed by all of the secret keys + # contained in GPG_KEYS (otherwise, any VM can just copy the + # GPG_KEYS hash from an existing mr-job and submit new jobs that can + # read private data) + + my %did_not_sign; + my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`; + while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) { + $did_not_sign{$1} = 1; + } + my $srfile = "$newhomedir/signedrequest"; + open SREQ, ">", $srfile; + print SREQ $$Job{"signedrequest"}; + close SREQ; + my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`; + unlink $srfile; + if ($gpg_v =~ /\nok\n$/s) { + while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) { + delete $did_not_sign{$1}; + } + } + if (%did_not_sign) { + croak (join ("\n", + "Some secret keys provided did not sign this job request:", + keys %did_not_sign) . "\n"); + } + } + + my $hostname = `hostname`; + chomp ($hostname); + + # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need + $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`; + $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`; + + # import all secret keys from my real home dir + `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`; + + # use the new gnupg dir from now on + $ENV{"GNUPGHOME"} = $newhomedir; + + # if I have a secret key for root@{host} or {user}@{host} or + # {configured-controller-gpg-uid}, add that as a recipient too so + # I'll be able to read frozentokeys etc. later + my %allkeys; + while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) { + $allkeys{$1} = 1; + } + my $encrypting_to_self = 0; + my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname); + push @try_these_uids, ($whc->{config}->{controller_gpg_uid}) + if exists $whc->{config}->{controller_gpg_uid}; + foreach my $id (@try_these_uids) { + if (exists $allkeys{$id}) { + $encrypt_to{$id} = 1; + $encrypting_to_self = 1; + last; + } + } + + if (!$encrypting_to_self) { + croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read"); + } + + # tell the client library (and child procs and jobsteps) to encrypt using these keys + $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to); + Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')"); + $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"}); +} + + + +$ENV{"MR_REVISION"} = $Job->{revision}; + +my $git_build_script; +my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/}); +if ($skip_install) +{ + $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision}; +} +else +{ + Log (undef, "Install revision ".$Job->{revision}); + my $nodelist = join(",", @node); + + # Clean out mrcompute/work and mrcompute/opt + + my $cleanpid = fork(); + if ($cleanpid == 0) + { + srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}], + ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']); + exit (1); + } + while (1) + { + last if $cleanpid == waitpid (-1, WNOHANG); + freeze_if_want_freeze ($cleanpid); + select (undef, undef, undef, 0.1); + } + Log (undef, "Clean-work-dir exited $?"); + + # Install requested code version + + my $build_script; + my @execargs; + my @srunargs = ("srun", + "--nodelist=$nodelist", + "-D", $ENV{TMPDIR}, "--job-name=$job_id"); + + $ENV{"MR_REVISION"} = $Job->{revision}; + $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps"; + $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt"; + + my $commit; + my $treeish = $Job->{revision}; + my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url"); + + # Create/update our clone of the remote git repo + + if (!-d $ENV{MR_REVISION_SRCDIR}) { + system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0 + or croak ("git clone $repo failed: exit ".($?>>8)); + system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false"); + } + `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`; + + # If this looks like a subversion r#, look for it in git-svn commit messages + + if ($treeish =~ m{^\d{1,4}$}) { + my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`; + chomp $gitlog; + if ($gitlog =~ /^[a-f0-9]{40}$/) { + $commit = $gitlog; + Log (undef, "Using commit $commit for revision $treeish"); + } + } + + # If that didn't work, try asking git to look it up as a tree-ish. + + if (!defined $commit) { + + my $cooked_treeish = $treeish; + if ($treeish !~ m{^[0-9a-f]{5,}$}) { + # Looks like a git branch name -- make sure git knows it's + # relative to the remote repo + $cooked_treeish = "origin/$treeish"; + } + + my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`; + chomp $found; + if ($found =~ /^[0-9a-f]{40}$/s) { + $commit = $found; + if ($commit ne $treeish) { + # Make sure we record the real commit id in the database, + # frozentokey, logs, etc. -- instead of an abbreviation or a + # branch name which can become ambiguous or point to a + # different commit in the future. + $ENV{"MR_REVISION"} = $commit; + $Job->{revision} = $commit; + dbh_do + ("update mrjob set revision=? where id=?", + undef, + $Job->{revision}, $Job->{id}); + Log (undef, "Using commit $commit for tree-ish $treeish"); + } + } + } + + if (defined $commit) { + $ENV{"MR_GIT_COMMIT"} = $commit; + $ENV{"MR_GIT_CLONE_URL"} = $repo; + @execargs = ("sh", "-c", + "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo"); + open GBS, "<", `echo -n \$(which whjob-checkout-and-build)` + or croak ("can't find whjob-checkout-and-build"); + local $/ = undef; + $git_build_script = ; + close GBS; + $build_script = $git_build_script; + } + elsif ($treeish =~ m{^(\d{1,5})$}) { + # Want a subversion r# but couldn't find it in git-svn history - + # might as well try using the subversion repo in case it's still + # there. + $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root"); + $ENV{"INSTALL_REVISION"} = $Job->{revision}; + $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish"; + $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src"; + @execargs = ("sh", "-c", + "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )"); + } + else { + croak ("could not figure out commit id for $treeish"); + } + + my $installpid = fork(); + if ($installpid == 0) + { + srun (\@srunargs, \@execargs, {}, $build_script); + exit (1); + } + while (1) + { + last if $installpid == waitpid (-1, WNOHANG); + freeze_if_want_freeze ($installpid); + select (undef, undef, undef, 0.1); + } + Log (undef, "Install exited $?"); +} + + + +foreach (qw (mrfunction revision nodes stepspernode inputkey)) +{ + Log (undef, $_ . " " . $Job->{$_}); +} +foreach (split (/\n/, $Job->{knobs})) +{ + Log (undef, "knob " . $_); +} + + + +my $success; + + + +ONELEVEL: + +my $thisround_succeeded = 0; +my $thisround_failed = 0; +my $thisround_failed_multiple = 0; + +@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} + or $a <=> $b } @jobstep_todo; +my $level = $jobstep[$jobstep_todo[0]]->{level}; +Log (undef, "start level $level"); + + + +my %proc; +my @freeslot = (0..$#slot); +my @holdslot; +my %reader; +my ($id, $input, $attempts); +my $progress_is_dirty = 1; +my $progress_stats_updated = 0; + +update_progress_stats(); + + + +THISROUND: +for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) +{ + $main::please_continue = 0; + + my $id = $jobstep_todo[$todo_ptr]; + my $Jobstep = $jobstep[$id]; + if ($Jobstep->{level} != $level) + { + next; + } + if ($Jobstep->{attempts} > 9) + { + Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up"); + $success = 0; + last THISROUND; + } + + pipe $reader{$id}, "writer" or croak ($!); + my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); + fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); + + my $childslot = $freeslot[0]; + my $childnode = $slot[$childslot]->{node}; + my $childslotname = join (".", + $slot[$childslot]->{node}->{name}, + $slot[$childslot]->{cpu}); + my $childpid = fork(); + if ($childpid == 0) + { + $SIG{'INT'} = 'DEFAULT'; + $SIG{'QUIT'} = 'DEFAULT'; + $SIG{'TERM'} = 'DEFAULT'; + + foreach (values (%reader)) + { + close($_); + } + fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec + open(STDOUT,">&writer"); + open(STDERR,">&writer"); + + undef $dbh; + undef $sth; + + + delete $ENV{"GNUPGHOME"}; + $ENV{"MR_ID"} = $id; + $ENV{"MR_INPUT"} = $Jobstep->{input}; + $ENV{"MR_KNOBS"} = $Job->{knobs}; + $ENV{"MR_LEVEL"} = $level; + $ENV{"MR_FUNCTION"} = $Job->{mrfunction}; + $ENV{"MR_INPUT0"} = $Job->{inputkey}; + $ENV{"MR_INPUTKEY"} = $Job->{inputkey}; + $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; + $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; + $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated + $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work"; + $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu}; + $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; + $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers); + $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain; + $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class; + + $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id; + $ENV{"TASK_QSEQUENCE"} = $id; + $ENV{"TASK_SEQUENCE"} = $Jobstep->{level}; + + $ENV{"GZIP"} = "-n"; + + my @srunargs = ( + "srun", + "--nodelist=".$childnode->{name}, + qw(-n1 -c1 -N1 -D), $ENV{TMPDIR}, + "--job-name=$job_id.$id.$$", + ); + my @execargs = qw(sh); + my $script = ""; + my $command = + "mkdir -p $ENV{TMPDIR}/mrcompute/revision " + ."&& cd $ENV{TMPDIR}/mrcompute "; + if ($git_build_script) + { + $script = $git_build_script; + $command .= + "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}"; + } + elsif (!$skip_install) + { + $command .= + "&& " + ."( " + ." [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] " + ."|| " + ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' " + ." && ./installrevision " + ." ) " + .") "; + } + if (exists $ENV{GPG_KEYS}) { + $command .= + "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; + } else { + $command .= + "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; + } + my @execargs = ('bash', '-c', $command); + srun (\@srunargs, \@execargs, undef, $script); + exit (1); + } + close("writer"); + if (!defined $childpid) + { + close $reader{$id}; + delete $reader{$id}; + next; + } + shift @freeslot; + $proc{$childpid} = { jobstep => $id, + time => time, + slot => $childslot, + jobstepname => "$job_id.$id.$childpid", + }; + croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; + $slot[$childslot]->{pid} = $childpid; + + Log ($id, "child $childpid started on $childslotname"); + $Jobstep->{attempts} ++; + $Jobstep->{starttime} = time; + $Jobstep->{node} = $childnode->{name}; + $Jobstep->{slotindex} = $childslot; + delete $Jobstep->{stderr}; + delete $Jobstep->{output}; + delete $Jobstep->{finishtime}; + + splice @jobstep_todo, $todo_ptr, 1; + --$todo_ptr; + + $progress_is_dirty = 1; + + while (!@freeslot + || + (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) + { + last THISROUND if $main::please_freeze; + if ($main::please_info) + { + $main::please_info = 0; + freeze(); + collate_output(); + save_meta(1); + update_progress_stats(); + } + my $gotsome + = readfrompipes () + + reapchildren (); + if (!$gotsome) + { + check_squeue(); + update_progress_stats(); + select (undef, undef, undef, 0.1); + } + elsif (time - $progress_stats_updated >= 30) + { + update_progress_stats(); + } + if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || + ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) + { + my $message = "Repeated failure rate too high ($thisround_failed_multiple/" + .($thisround_failed+$thisround_succeeded) + .") -- giving up on this round"; + Log (undef, $message); + last THISROUND; + } + + # move slots from freeslot to holdslot (or back to freeslot) if necessary + for (my $i=$#freeslot; $i>=0; $i--) { + if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) { + push @holdslot, (splice @freeslot, $i, 1); + } + } + for (my $i=$#holdslot; $i>=0; $i--) { + if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) { + push @freeslot, (splice @holdslot, $i, 1); + } + } + + # give up if no nodes are succeeding + if (!grep { $_->{node}->{losing_streak} == 0 } @slot) { + my $message = "Every node has failed -- giving up on this round"; + Log (undef, $message); + last THISROUND; + } + } +} + + +push @freeslot, splice @holdslot; +map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot); + + +Log (undef, "wait for last ".(scalar keys %proc)." children to finish"); +while (%proc) +{ + goto THISROUND if $main::please_continue; + $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; + readfrompipes (); + if (!reapchildren()) + { + check_squeue(); + update_progress_stats(); + select (undef, undef, undef, 0.1); + killem (keys %proc) if $main::please_freeze; + } +} + +update_progress_stats(); +freeze_if_want_freeze(); + + +if (@jobstep_tomerge && !@jobstep_todo) +{ + push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0), + level => $jobstep_tomerge_level, + attempts => 0 }; + push @jobstep_todo, $#jobstep; +} + + +if (!defined $success) +{ + if (@jobstep_todo && + $thisround_succeeded == 0 && + ($thisround_failed == 0 || $thisround_failed > 4)) + { + my $message = "stop because $thisround_failed tasks failed and none succeeded"; + Log (undef, $message); + $success = 0; + } + if (!@jobstep_todo) + { + $success = 1; + } +} + +goto ONELEVEL if !defined $success; + + +release_allocation(); +freeze(); +my $key = &collate_output(); +$success = 0 if !$key; + + +if ($key) +{ + my @keepkey; + foreach my $hash (split ",", $key) + { + my $keephash = $whc->store_in_keep (hash => $hash, + nnodes => 3); + if (!$keephash) + { + Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr); + $keephash = $hash; + } + push @keepkey, $keephash; + } + my $keepkey = join (",", @keepkey); + Log (undef, "outputkey+K $keepkey"); + print "$keepkey\n" if $success; + + if ($output_in_keep) + { + $key = $keepkey; + } + + dbh_do ("update mrjob set output=? where id=?", undef, + $key, $job_id) + or croak ($dbh->errstr); + + $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id") + or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr); +} + + +Log (undef, "finish"); + +dbh_do ("update mrjob set finishtime=now(), success=? + where id=? and jobmanager_id=?", undef, + $success, $job_id, $jobmanager_id) + or croak ($dbh->errstr); + +save_meta(); +exit 0; + + + +sub update_progress_stats +{ + $progress_stats_updated = time; + return if !$progress_is_dirty; + my ($todo, $done, $running) = (scalar @jobstep_todo, + scalar @jobstep_done, + scalar @slot - scalar @freeslot - scalar @holdslot); + dbh_do + ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?", + undef, + $todo, $done, $running, $job_id); + Log (undef, "status: $done done, $running running, $todo todo"); + $progress_is_dirty = 0; +} + + + +sub reapchildren +{ + my $pid = waitpid (-1, WNOHANG); + return 0 if $pid <= 0; + + my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} + . "." + . $slot[$proc{$pid}->{slot}]->{cpu}); + my $jobstepid = $proc{$pid}->{jobstep}; + my $elapsed = time - $proc{$pid}->{time}; + my $Jobstep = $jobstep[$jobstepid]; + + process_stderr_for_output_key ($jobstepid); + + my $exitcode = $?; + my $exitinfo = "exit $exitcode"; + if (!exists $Jobstep->{output}) + { + $exitinfo .= " with no output key"; + $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys; + } + + if ($exitcode == 0 && $Jobstep->{node_fail}) { + $exitinfo .= " but recording as failure"; + $exitcode = -1; + } + + Log ($jobstepid, "child $pid on $whatslot $exitinfo"); + + if ($exitcode != 0 || $Jobstep->{node_fail}) + { + --$Jobstep->{attempts} if $Jobstep->{node_fail}; + ++$thisround_failed; + ++$thisround_failed_multiple if $Jobstep->{attempts} > 1; + + # Check for signs of a failed or misconfigured node + if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= + 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { + # Don't count this against jobstep failure thresholds if this + # node is already suspected faulty and srun exited quickly + if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && + $elapsed < 5 && + $Jobstep->{attempts} > 1) { + Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts"); + --$Jobstep->{attempts}; + } + ban_node_by_slot($proc{$pid}->{slot}); + } + + push @jobstep_todo, $jobstepid; + Log ($jobstepid, "failure in $elapsed seconds"); + } + else + { + ++$thisround_succeeded; + $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; + push @jobstep_done, $jobstepid; + Log ($jobstepid, "success in $elapsed seconds"); + } + $Jobstep->{exitcode} = $exitcode; + $Jobstep->{finishtime} = time; + process_stderr ($jobstepid, $exitcode == 0); + Log ($jobstepid, "output $$Jobstep{output}"); + + close $reader{$jobstepid}; + delete $reader{$jobstepid}; + delete $slot[$proc{$pid}->{slot}]->{pid}; + push @freeslot, $proc{$pid}->{slot}; + delete $proc{$pid}; + + $progress_is_dirty = 1; + 1; +} + + +sub check_squeue +{ + # return if the kill list was checked <4 seconds ago + if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) + { + return; + } + $squeue_kill_checked = time; + + # use killem() on procs whose killtime is reached + for (keys %proc) + { + if (exists $proc{$_}->{killtime} + && $proc{$_}->{killtime} <= time) + { + killem ($_); + } + } + + # return if the squeue was checked <60 seconds ago + if (defined $squeue_checked && $squeue_checked > time - 60) + { + return; + } + $squeue_checked = time; + + if (!$have_slurm) + { + # here is an opportunity to check for mysterious problems with local procs + return; + } + + # get a list of steps still running + my @squeue = `squeue -s -h -o '%i %j' && echo ok`; + chop @squeue; + if ($squeue[-1] ne "ok") + { + return; + } + pop @squeue; + + # which of my jobsteps are running, according to squeue? + my %ok; + foreach (@squeue) + { + if (/^(\d+)\.(\d+) (\S+)/) + { + if ($1 eq $ENV{SLURM_JOBID}) + { + $ok{$3} = 1; + } + } + } + + # which of my active child procs (>60s old) were not mentioned by squeue? + foreach (keys %proc) + { + if ($proc{$_}->{time} < time - 60 + && !exists $ok{$proc{$_}->{jobstepname}} + && !exists $proc{$_}->{killtime}) + { + # kill this proc if it hasn't exited in 30 seconds + $proc{$_}->{killtime} = time + 30; + } + } +} + + +sub release_allocation +{ + if ($have_slurm) + { + Log (undef, "release job allocation"); + system "scancel $ENV{SLURM_JOBID}"; + } +} + + +sub readfrompipes +{ + my $gotsome = 0; + foreach my $job (keys %reader) + { + my $buf; + while (0 < sysread ($reader{$job}, $buf, 8192)) + { + print STDERR $buf if $ENV{MR_DEBUG}; + $jobstep[$job]->{stderr} .= $buf; + preprocess_stderr ($job); + if (length ($jobstep[$job]->{stderr}) > 16384 && + $jobstep[$job]->{stderr} !~ /\+\+\+mr/) + { + substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; + } + $gotsome = 1; + } + } + return $gotsome; +} + + +sub process_stderr_for_output_key +{ + my $job = shift; + while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s) + { + $jobstep[$job]->{output} = $1; + $jobsteps_must_output_keys = 1; + } +} + + +sub preprocess_stderr +{ + my $job = shift; + + $jobstep[$job]->{stderr_jobsteps} = [] + if !exists $jobstep[$job]->{stderr_jobsteps}; + + $jobstep[$job]->{stderr} =~ + s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{ + push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1); + ""; + }gse; + + while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { + my $line = $1; + if ($line =~ /\+\+\+mr/) { + last; + } + substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; + Log ($job, "stderr $line"); + if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) { + # whoa. + $main::please_freeze = 1; + } + elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) { + $jobstep[$job]->{node_fail} = 1; + ban_node_by_slot($jobstep[$job]->{slotindex}); + } + } +} + + +sub process_stderr +{ + my $job = shift; + my $success = shift; + preprocess_stderr ($job); + + map { + Log ($job, "stderr $_"); + } split ("\n", $jobstep[$job]->{stderr}); + + if (!$success || !exists $jobstep[$job]->{stderr_jobsteps}) + { + delete $jobstep[$job]->{stderr_jobsteps}; + return; + } + + foreach (@{ $jobstep[$job]->{stderr_jobsteps} }) + { + /^(?:\/(\d+|\*))? (\d+) (.*)/s; + my ($merge, $level, $input) = ($1, $2, $3); + my $newjobref; + if ($merge) + { + push @jobstep_tomerge, $input; + $jobstep_tomerge_level = $level; + if ($merge !~ /\D/ && @jobstep_tomerge >= $merge) + { + $newjobref = { input => join ("\n", + splice @jobstep_tomerge, 0, $merge), + level => $level, + attempts => 0 }; + } + } + else + { + $newjobref = { input => $input, + level => $level, + attempts => 0 }; + } + if ($newjobref) + { + push @jobstep, $newjobref; + push @jobstep_todo, $#jobstep; + } + } + delete $jobstep[$job]->{stderr_jobsteps}; +} + + +sub collate_output +{ + Log (undef, "collate"); + $whc->write_start (1); + my $key; + for (@jobstep) + { + next if !exists $_->{output} || $_->{exitcode} != 0; + my $output = $_->{output}; + if ($output !~ /^[0-9a-f]{32}/) + { + $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; + $whc->write_data ($output); + } + elsif (@jobstep == 1) + { + $key = $output; + $whc->write_finish; + } + elsif (defined (my $outblock = $whc->fetch_block ($output))) + { + $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; + $whc->write_data ($outblock); + } + else + { + my $errstr = $whc->errstr; + $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n"); + $success = 0; + } + } + $key = $whc->write_finish if !defined $key; + if ($key) + { + Log (undef, "outputkey $key"); + dbh_do ("update mrjob set output=? where id=?", undef, + $key, $job_id) + or Log (undef, "db update failed: ".$DBI::errstr); + } + else + { + Log (undef, "outputkey undef"); + } + return $key; +} + + +sub killem +{ + foreach (@_) + { + my $sig = 2; # SIGINT first + if (exists $proc{$_}->{"sent_$sig"} && + time - $proc{$_}->{"sent_$sig"} > 4) + { + $sig = 15; # SIGTERM if SIGINT doesn't work + } + if (exists $proc{$_}->{"sent_$sig"} && + time - $proc{$_}->{"sent_$sig"} > 4) + { + $sig = 9; # SIGKILL if SIGTERM doesn't work + } + if (!exists $proc{$_}->{"sent_$sig"}) + { + Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); + kill $sig, $_; + select (undef, undef, undef, 0.1); + if ($sig == 2) + { + kill $sig, $_; # srun wants two SIGINT to really interrupt + } + $proc{$_}->{"sent_$sig"} = time; + $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"}; + } + } +} + + +sub fhbits +{ + my($bits); + for (@_) { + vec($bits,fileno($_),1) = 1; + } + $bits; +} + + +sub Log # ($jobstep_id, $logmessage) +{ + if ($_[1] =~ /\n/) { + for my $line (split (/\n/, $_[1])) { + Log ($_[0], $line); + } + return; + } + my $fh = select STDERR; $|=1; select $fh; + my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); + $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; + $message .= "\n"; + my $datetime; + if ($metastream || -t STDERR) { + my @gmtime = gmtime; + $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", + $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); + } + print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); + + return if !$metastream; + $metastream->write_data ($datetime . " " . $message); +} + + +sub reconnect_database +{ + return if !$have_database; + return if ($dbh && $dbh->do ("select now()")); + for (1..16) + { + $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); + if ($dbh) { + $dbh->{InactiveDestroy} = 1; + return; + } + warn ($DBI::errstr); + sleep $_; + } + croak ($DBI::errstr) if !$dbh; +} + + +sub dbh_do +{ + return 1 if !$have_database; + my $ret = $dbh->do (@_); + return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/); + reconnect_database(); + return $dbh->do (@_); +} + + +sub croak +{ + my ($package, $file, $line) = caller; + my $message = "@_ at $file line $line\n"; + Log (undef, $message); + freeze() if @jobstep_todo; + collate_output() if @jobstep_todo; + cleanup(); + save_meta() if $metastream; + die; +} + + +sub cleanup +{ + return if !$have_database || !$dbh; + + reconnect_database(); + my $sth; + $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?"); + $sth->execute ($jobmanager_id); + $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null"); + $sth->execute ($job_id, $jobmanager_id); +} + + +sub save_meta +{ + reconnect_database(); + my $justcheckpoint = shift; # false if this will be the last meta saved + my $m = $metastream; + $m = $m->copy if $justcheckpoint; + $m->write_finish; + my $key = $m->as_key; + undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it + Log (undef, "meta key is $key"); + dbh_do ("update mrjob set metakey=? where id=?", + undef, + $key, $job_id); +} + + +sub freeze_if_want_freeze +{ + if ($main::please_freeze) + { + release_allocation(); + if (@_) + { + # kill some srun procs before freeze+stop + map { $proc{$_} = {} } @_; + while (%proc) + { + killem (keys %proc); + select (undef, undef, undef, 0.1); + my $died; + while (($died = waitpid (-1, WNOHANG)) > 0) + { + delete $proc{$died}; + } + } + } + freeze(); + collate_output(); + cleanup(); + save_meta(); + exit 0; + } +} + + +sub freeze +{ + Log (undef, "freeze"); + + my $freezer = new Warehouse::Stream (whc => $whc); + $freezer->clear; + $freezer->name ("."); + $freezer->write_start ("state.txt"); + + $freezer->write_data (join ("\n", + "job $Job->{id}", + map + { + $_ . "=" . freezequote($Job->{$_}) + } grep { $_ ne "id" } keys %$Job) . "\n\n"); + + foreach my $Jobstep (@jobstep) + { + my $str = join ("\n", + map + { + $_ . "=" . freezequote ($Jobstep->{$_}) + } grep { + $_ !~ /^stderr|slotindex|node_fail/ + } keys %$Jobstep); + $freezer->write_data ($str."\n\n"); + } + if (@jobstep_tomerge) + { + $freezer->write_data + ("merge $jobstep_tomerge_level " + . freezequote (join ("\n", + map { freezequote ($_) } @jobstep_tomerge)) + . "\n\n"); + } + + $freezer->write_finish; + my $frozentokey = $freezer->as_key; + undef $freezer; + Log (undef, "frozento key is $frozentokey"); + dbh_do ("update mrjob set frozentokey=? where id=?", undef, + $frozentokey, $job_id); + my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3); + Log (undef, "frozento+K key is $kfrozentokey"); + return $frozentokey; +} + + +sub thaw +{ + my $key = shift; + Log (undef, "thaw from $key"); + + @jobstep = (); + @jobstep_done = (); + @jobstep_todo = (); + @jobstep_tomerge = (); + $jobstep_tomerge_level = 0; + my $frozenjob = {}; + + my $stream = new Warehouse::Stream ( whc => $whc, + hash => [split (",", $key)] ); + $stream->rewind; + while (my $dataref = $stream->read_until (undef, "\n\n")) + { + if ($$dataref =~ /^job /) + { + foreach (split ("\n", $$dataref)) + { + my ($k, $v) = split ("=", $_, 2); + $frozenjob->{$k} = freezeunquote ($v); + } + next; + } + + if ($$dataref =~ /^merge (\d+) (.*)/) + { + $jobstep_tomerge_level = $1; + @jobstep_tomerge + = map { freezeunquote ($_) } split ("\n", freezeunquote($2)); + next; + } + + my $Jobstep = { }; + foreach (split ("\n", $$dataref)) + { + my ($k, $v) = split ("=", $_, 2); + $Jobstep->{$k} = freezeunquote ($v) if $k; + } + $Jobstep->{attempts} = 0; + push @jobstep, $Jobstep; + + if ($Jobstep->{exitcode} eq "0") + { + push @jobstep_done, $#jobstep; + } + else + { + push @jobstep_todo, $#jobstep; + } + } + + foreach (qw (mrfunction revision inputkey knobs)) + { + $Job->{$_} = $frozenjob->{$_}; + } + dbh_do + ("update mrjob + set mrfunction=?, revision=?, input0=?, knobs=? + where id=?", undef, + $Job->{mrfunction}, + $Job->{revision}, + $Job->{inputkey}, + $Job->{knobs}, + $Job->{id}, + ); +} + + +sub freezequote +{ + my $s = shift; + $s =~ s/\\/\\\\/g; + $s =~ s/\n/\\n/g; + return $s; +} + + +sub freezeunquote +{ + my $s = shift; + $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge; + return $s; +} + + +sub srun +{ + my $srunargs = shift; + my $execargs = shift; + my $opts = shift || {}; + my $stdin = shift; + my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; + print STDERR (join (" ", + map { / / ? "'$_'" : $_ } + (@$args)), + "\n") + if $ENV{MR_DEBUG}; + + if (defined $stdin) { + my $child = open STDIN, "-|"; + defined $child or die "no fork: $!"; + if ($child == 0) { + print $stdin or die $!; + close STDOUT or die $!; + exit 0; + } + } + + return system (@$args) if $opts->{fork}; + + exec @$args; + warn "ENV size is ".length(join(" ",%ENV)); + die "exec failed: $!: @$args"; +} + + +sub ban_node_by_slot { + # Don't start any new jobsteps on this node for 60 seconds + my $slotid = shift; + $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; + Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); +} -- 2.30.2