=head1 NAME
-whjobmanager: Execute job steps, save snapshots as requested, collate output.
+crunch-job: Execute job steps, save snapshots as requested, collate output.
=head1 SYNOPSIS
-Obtain job details from database, run tasks on compute nodes
-(typically invoked by scheduler on cloud controller):
+Obtain job details from Arvados, run tasks on compute nodes (typically
+invoked by scheduler on controller):
- whjobmanager jobid
+ crunch-job --job x-y-z
Obtain job details from command line, run tasks on local machine
(typically invoked by application or developer on VM):
- whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \
- [stepspernode=N] [SOMEKNOB=value] ...
+ crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+
+=head1 OPTIONS
+
+=over
+
+=item --force-unlock
+
+If the job is already locked, steal the lock and run it anyway.
+
+=item --git-dir
+
+Path to .git directory where the specified commit is found.
+
+=item --job-api-token
+
+Arvados API authorization token to use during the course of the job.
+
+=back
=head1 RUNNING JOBS LOCALLY
-whjobmanager(1p)'s log messages appear on stderr, and are saved in the
-warehouse at each checkpoint and when the job finishes.
+crunch-job's log messages appear on stderr along with the job tasks'
+stderr streams. The log is saved in Keep at each checkpoint and when
+the job finishes.
If the job succeeds, the job's output locator is printed on stdout.
-If a job step outputs anything to stderr, it appears in
-whjobmanager(1p)'s log when the step finishes.
-
While the job is running, the following signals are accepted:
=over
=item control-C, SIGINT, SIGQUIT
-Save a checkpoint, terminate any job steps that are running, and stop.
+Save a checkpoint, terminate any job tasks that are running, and stop.
=item SIGALRM
Save a checkpoint and continue.
-=back
+=item SIGHUP
-=head1 SEE ALSO
+Refresh node allocation (i.e., check whether any nodes have been added
+or unallocated). Currently this is a no-op.
-whintro(1p), wh(1p)
+=back
=cut
use strict;
-use DBI;
use POSIX ':sys_wait_h';
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Arvados;
+use Getopt::Long;
use Warehouse;
use Warehouse::Stream;
+use IPC::System::Simple qw(capturex);
$ENV{"TMPDIR"} ||= "/tmp";
-
-do '/etc/warehouse/warehouse-server.conf';
+$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
+$ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
+mkdir ($ENV{"CRUNCH_TMP"});
+
+my $force_unlock;
+my $git_dir;
+my $jobspec;
+my $job_api_token;
+my $resume_stash;
+GetOptions('force-unlock' => \$force_unlock,
+ 'git-dir=s' => \$git_dir,
+ 'job=s' => \$jobspec,
+ 'job-api-token=s' => \$job_api_token,
+ 'resume-stash=s' => \$resume_stash,
+ );
+
+if (defined $job_api_token) {
+ $ENV{ARVADOS_API_TOKEN} = $job_api_token;
+}
my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/;
+my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
+$SIG{'HUP'} = sub
+{
+ 1;
+};
$SIG{'USR1'} = sub
{
- $main::ENV{MR_DEBUG} = 1;
+ $main::ENV{CRUNCH_DEBUG} = 1;
};
$SIG{'USR2'} = sub
{
- $main::ENV{MR_DEBUG} = 0;
+ $main::ENV{CRUNCH_DEBUG} = 0;
};
-my $whc = new Warehouse or croak ("failed to create Warehouse client");
-my $metastream = new Warehouse::Stream (whc => $whc);
+my $arv = Arvados->new;
+my $metastream = Warehouse::Stream->new(whc => new Warehouse);
$metastream->clear;
-$metastream->name (".");
-$metastream->write_start ("log.txt");
-
-
+$metastream->write_start('log.txt');
+my $User = {};
my $Job = {};
my $job_id;
my $dbh;
my $sth;
-if ($have_database)
+if ($job_has_uuid)
{
- ($job_id) = @ARGV;
-
- $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
- croak ($DBI::errstr) if !$dbh;
- $dbh->{InactiveDestroy} = 1;
-
- $sth = $dbh->prepare ("select * from mrjob where id=?");
- $sth->execute ($job_id) or croak ($dbh->errstr);
- $Job = $sth->fetchrow_hashref or croak ($sth->errstr);
+ $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ $User = $arv->{'users'}->{'current'}->execute;
+ if (!$force_unlock) {
+ if ($Job->{'is_locked_by'}) {
+ croak("Job is locked: " . $Job->{'is_locked_by'});
+ }
+ if ($Job->{'success'} ne undef) {
+ croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
+ }
+ if ($Job->{'running'}) {
+ croak("Job 'running' flag is already set");
+ }
+ if ($Job->{'started_at'}) {
+ croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
+ }
+ }
}
-
else
{
- my %knob;
- foreach (@ARGV)
- {
- if (/([a-z].*?)=(.*)/) {
- $Job->{$1} = $2;
- } elsif (/(.*?)=(.*)/) {
- $knob{$1} = $2;
- }
- }
- $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob);
+ $Job = JSON::decode_json($jobspec);
- if (!$Job->{thawedfromkey})
+ if (!$resume_stash)
{
map { croak ("No $_ specified") unless $Job->{$_} }
- qw(mrfunction revision inputkey);
+ qw(script script_version script_parameters);
}
- if (!defined $Job->{id}) {
- chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`));
+ if (!defined $Job->{'uuid'}) {
+ my $hostname = `hostname -s`;
+ chomp $hostname;
+ $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
}
- $job_id = $Job->{id};
}
+$job_id = $Job->{'uuid'};
-$Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey};
-delete $Job->{input0};
-
-
-
-my $max_ncpus;
-map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs});
-$max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/;
-$max_ncpus = $$Job{stepspernode} if $$Job{stepspernode};
-my $maxstepspernode;
-
+$Job->{'resource_limits'} ||= {};
+$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
+my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
Log (undef, "check slurm allocation");
foreach (@sinfo)
{
my ($ncpus, $slurm_nodelist) = split;
- $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0;
- $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus;
+ $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
my @nodelist;
while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
my $jobmanager_id;
-if ($have_database)
+if ($job_has_uuid)
{
# Claim this job, and make sure nobody else does
- $sth = $dbh->prepare ("insert into mrjobmanager
- (pid, revision, starttime)
- values (?, ?, now())");
- my $rev = q/$Revision$/;
- $rev =~ /\d+/;
- $sth->execute ($$, +$&) or croak ($dbh->errstr);
-
- $sth = $dbh->prepare ("select last_insert_id()");
- $sth->execute or croak ($dbh->errstr);
- ($jobmanager_id) = $sth->fetchrow_array;
-
- $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now()
- where id=? and jobmanager_id is null");
- $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr);
-
- $sth = $dbh->prepare ("select jobmanager_id from mrjob
- where id=?");
- $sth->execute ($job_id) or croak ($dbh->errstr);
- my ($check_jobmanager_id) = $sth->fetchrow_array;
- if ($check_jobmanager_id != $jobmanager_id)
- {
- # race condition - another job manager proc stole the job
- Log (undef,
- "job taken by jobmanager id $check_jobmanager_id");
- exit (1);
+ $Job->{'is_locked_by'} = $User->{'uuid'};
+ $Job->{'started_at'} = gmtime;
+ $Job->{'running'} = 1;
+ $Job->{'success'} = undef;
+ $Job->{'tasks_summary'} = { 'failed' => 0,
+ 'todo' => 1,
+ 'running' => 0,
+ 'done' => 0 };
+ if ($job_has_uuid) {
+ unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
+ croak("Error while updating / locking job");
+ }
}
}
my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
-$ENV{"MR_JOB_ID"} = $job_id;
+$ENV{"CRUNCH_JOB_UUID"} = $job_id;
$ENV{"JOB_UUID"} = $job_id;
}
else
{
- push @jobstep, { input => $Job->{inputkey},
- level => 0,
- attempts => 0,
+ my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+ 'job_uuid' => $Job->{'uuid'},
+ 'sequence' => 0,
+ 'qsequence' => 0,
+ 'parameters' => {},
+ });
+ push @jobstep, { 'level' => 0,
+ 'attempts' => 0,
+ 'arvados_task' => $first_task,
};
push @jobstep_todo, 0;
}
-
-mkdir ($ENV{"TMPDIR"}."/mrcompute");
-if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) {
- # set up a fresh gnupg directory just for this process
- # TODO: reap abandoned gnupg dirs
- system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$");
- mkdir ($ENV{"TMPDIR"}."/mrcompute");
- mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700);
- mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!");
-
- my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$";
-
- open C, ">", $newhomedir."/gpg.conf";
- print C "always-trust\n";
- close C;
-
- # import secret keys referenced in job spec
- my $hashes = $1;
- $hashes =~ s/\'/\'\\\'\'/g;
- my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`;
- my %encrypt_to;
- while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) {
- my $keynum = $1;
- while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) {
- $encrypt_to{$1} = 1;
- }
- }
- if (!%encrypt_to) {
- croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out");
- }
-
- if ($have_database) {
-
- # make sure the job request was signed by all of the secret keys
- # contained in GPG_KEYS (otherwise, any VM can just copy the
- # GPG_KEYS hash from an existing mr-job and submit new jobs that can
- # read private data)
-
- my %did_not_sign;
- my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`;
- while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) {
- $did_not_sign{$1} = 1;
- }
- my $srfile = "$newhomedir/signedrequest";
- open SREQ, ">", $srfile;
- print SREQ $$Job{"signedrequest"};
- close SREQ;
- my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`;
- unlink $srfile;
- if ($gpg_v =~ /\nok\n$/s) {
- while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) {
- delete $did_not_sign{$1};
- }
- }
- if (%did_not_sign) {
- croak (join ("\n",
- "Some secret keys provided did not sign this job request:",
- keys %did_not_sign) . "\n");
- }
- }
-
- my $hostname = `hostname`;
- chomp ($hostname);
-
- # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need
- $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`;
- $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`;
-
- # import all secret keys from my real home dir
- `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`;
-
- # use the new gnupg dir from now on
- $ENV{"GNUPGHOME"} = $newhomedir;
-
- # if I have a secret key for root@{host} or {user}@{host} or
- # {configured-controller-gpg-uid}, add that as a recipient too so
- # I'll be able to read frozentokeys etc. later
- my %allkeys;
- while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) {
- $allkeys{$1} = 1;
- }
- my $encrypting_to_self = 0;
- my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname);
- push @try_these_uids, ($whc->{config}->{controller_gpg_uid})
- if exists $whc->{config}->{controller_gpg_uid};
- foreach my $id (@try_these_uids) {
- if (exists $allkeys{$id}) {
- $encrypt_to{$id} = 1;
- $encrypting_to_self = 1;
- last;
- }
- }
-
- if (!$encrypting_to_self) {
- croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read");
- }
-
- # tell the client library (and child procs and jobsteps) to encrypt using these keys
- $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to);
- Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')");
- $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"});
-}
-
+my $build_script;
+do {
+ local $/ = undef;
+ $build_script = <DATA>;
+};
-$ENV{"MR_REVISION"} = $Job->{revision};
+$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
-my $git_build_script;
my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
if ($skip_install)
{
- $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision};
+ $ENV{"CRUNCH_SRC"} = $Job->{revision};
}
else
{
Log (undef, "Install revision ".$Job->{revision});
my $nodelist = join(",", @node);
- # Clean out mrcompute/work and mrcompute/opt
+ # Clean out crunch_tmp/work and crunch_tmp/opt
my $cleanpid = fork();
if ($cleanpid == 0)
{
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}],
- ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']);
+ srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
exit (1);
}
while (1)
# Install requested code version
- my $build_script;
my @execargs;
my @srunargs = ("srun",
"--nodelist=$nodelist",
- "-D", $ENV{TMPDIR}, "--job-name=$job_id");
+ "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
- $ENV{"MR_REVISION"} = $Job->{revision};
- $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps";
- $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt";
+ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
+ $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
+ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
my $commit;
- my $treeish = $Job->{revision};
- my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url");
+ my $git_archive;
+ my $treeish = $Job->{'script_version'};
+ my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
+ # Todo: let script_version specify repository instead of expecting
+ # parent process to figure it out.
+ $ENV{"CRUNCH_SRC_URL"} = $repo;
# Create/update our clone of the remote git repo
- if (!-d $ENV{MR_REVISION_SRCDIR}) {
- system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0
+ if (!-d $ENV{"CRUNCH_SRC"}) {
+ system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
or croak ("git clone $repo failed: exit ".($?>>8));
- system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false");
+ system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
}
- `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`;
+ `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
# If this looks like a subversion r#, look for it in git-svn commit messages
if ($treeish =~ m{^\d{1,4}$}) {
- my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
+ my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
chomp $gitlog;
if ($gitlog =~ /^[a-f0-9]{40}$/) {
$commit = $gitlog;
$cooked_treeish = "origin/$treeish";
}
- my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`;
+ my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
chomp $found;
if ($found =~ /^[0-9a-f]{40}$/s) {
$commit = $found;
# frozentokey, logs, etc. -- instead of an abbreviation or a
# branch name which can become ambiguous or point to a
# different commit in the future.
- $ENV{"MR_REVISION"} = $commit;
- $Job->{revision} = $commit;
- dbh_do
- ("update mrjob set revision=? where id=?",
- undef,
- $Job->{revision}, $Job->{id});
+ $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
Log (undef, "Using commit $commit for tree-ish $treeish");
+ if ($commit ne $treeish) {
+ $Job->{'script_version'} = $commit;
+ !$job_has_uuid or $Job->save() or croak("Error while updating job");
+ }
}
}
}
if (defined $commit) {
- $ENV{"MR_GIT_COMMIT"} = $commit;
- $ENV{"MR_GIT_CLONE_URL"} = $repo;
- @execargs = ("sh", "-c",
- "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo");
- open GBS, "<", `echo -n \$(which whjob-checkout-and-build)`
- or croak ("can't find whjob-checkout-and-build");
- local $/ = undef;
- $git_build_script = <GBS>;
- close GBS;
- $build_script = $git_build_script;
- }
- elsif ($treeish =~ m{^(\d{1,5})$}) {
- # Want a subversion r# but couldn't find it in git-svn history -
- # might as well try using the subversion repo in case it's still
- # there.
- $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root");
- $ENV{"INSTALL_REVISION"} = $Job->{revision};
- $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish";
- $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src";
+ $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
@execargs = ("sh", "-c",
- "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )");
+ "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+ $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
}
else {
croak ("could not figure out commit id for $treeish");
my $installpid = fork();
if ($installpid == 0)
{
- srun (\@srunargs, \@execargs, {}, $build_script);
+ srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
exit (1);
}
while (1)
-foreach (qw (mrfunction revision nodes stepspernode inputkey))
+foreach (qw (script script_version script_parameters resource_limits))
{
- Log (undef, $_ . " " . $Job->{$_});
+ Log (undef,
+ "$_ " .
+ (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
}
foreach (split (/\n/, $Job->{knobs}))
{
my @freeslot = (0..$#slot);
my @holdslot;
my %reader;
-my ($id, $input, $attempts);
my $progress_is_dirty = 1;
my $progress_stats_updated = 0;
undef $dbh;
undef $sth;
-
delete $ENV{"GNUPGHOME"};
- $ENV{"MR_ID"} = $id;
- $ENV{"MR_INPUT"} = $Jobstep->{input};
- $ENV{"MR_KNOBS"} = $Job->{knobs};
- $ENV{"MR_LEVEL"} = $level;
- $ENV{"MR_FUNCTION"} = $Job->{mrfunction};
- $ENV{"MR_INPUT0"} = $Job->{inputkey};
- $ENV{"MR_INPUTKEY"} = $Job->{inputkey};
- $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
- $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
- $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated
- $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work";
- $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu};
- $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
- $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers);
- $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain;
- $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class;
-
- $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id;
+ $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
$ENV{"TASK_QSEQUENCE"} = $id;
- $ENV{"TASK_SEQUENCE"} = $Jobstep->{level};
+ $ENV{"TASK_SEQUENCE"} = $level;
+ $ENV{"JOB_SCRIPT"} = $Job->{script};
+ while (my ($param, $value) = each %{$Job->{script_parameters}}) {
+ $param =~ tr/a-z/A-Z/;
+ $ENV{"JOB_PARAMETER_$param"} = $value;
+ }
+ $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
+ $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
+ $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
+ $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
$ENV{"GZIP"} = "-n";
my @srunargs = (
"srun",
"--nodelist=".$childnode->{name},
- qw(-n1 -c1 -N1 -D), $ENV{TMPDIR},
+ qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
my @execargs = qw(sh);
- my $script = "";
+ my $build_script_to_send = "";
my $command =
- "mkdir -p $ENV{TMPDIR}/mrcompute/revision "
- ."&& cd $ENV{TMPDIR}/mrcompute ";
- if ($git_build_script)
+ "mkdir -p $ENV{CRUNCH_TMP}/revision "
+ ."&& cd $ENV{CRUNCH_TMP} ";
+ if ($build_script)
{
- $script = $git_build_script;
+ $build_script_to_send = $build_script;
$command .=
- "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}";
+ "&& perl -";
}
elsif (!$skip_install)
{
$command .=
"&& "
."( "
- ." [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] "
+ ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
."|| "
." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
." && ./installrevision "
." ) "
.") ";
}
- if (exists $ENV{GPG_KEYS}) {
- $command .=
- "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
- } else {
- $command .=
- "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager";
- }
+ $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
+ $command .=
+ "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
my @execargs = ('bash', '-c', $command);
- srun (\@srunargs, \@execargs, undef, $script);
+ srun (\@srunargs, \@execargs, undef, $build_script_to_send);
exit (1);
}
close("writer");
croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
$slot[$childslot]->{pid} = $childpid;
+ Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
Log ($id, "child $childpid started on $childslotname");
$Jobstep->{attempts} ++;
$Jobstep->{starttime} = time;
$Jobstep->{node} = $childnode->{name};
$Jobstep->{slotindex} = $childslot;
delete $Jobstep->{stderr};
- delete $Jobstep->{output};
delete $Jobstep->{finishtime};
splice @jobstep_todo, $todo_ptr, 1;
}
# give up if no nodes are succeeding
- if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
+ if (!grep { $_->{node}->{losing_streak} == 0 &&
+ $_->{node}->{hold_count} < 4 } @slot) {
my $message = "Every node has failed -- giving up on this round";
Log (undef, $message);
last THISROUND;
freeze_if_want_freeze();
-if (@jobstep_tomerge && !@jobstep_todo)
-{
- push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0),
- level => $jobstep_tomerge_level,
- attempts => 0 };
- push @jobstep_todo, $#jobstep;
-}
-
-
if (!defined $success)
{
if (@jobstep_todo &&
release_allocation();
freeze();
-my $key = &collate_output();
-$success = 0 if !$key;
-
+$Job->{'output'} = &collate_output();
+$Job->{'success'} = $Job->{'output'} && $success;
+$Job->save if $job_has_uuid;
-if ($key)
+if ($Job->{'output'})
{
- my @keepkey;
- foreach my $hash (split ",", $key)
- {
- my $keephash = $whc->store_in_keep (hash => $hash,
- nnodes => 3);
- if (!$keephash)
- {
- Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr);
- $keephash = $hash;
- }
- push @keepkey, $keephash;
- }
- my $keepkey = join (",", @keepkey);
- Log (undef, "outputkey+K $keepkey");
- print "$keepkey\n" if $success;
-
- if ($output_in_keep)
- {
- $key = $keepkey;
+ eval {
+ my $manifest_text = capturex("whget", $Job->{'output'});
+ $arv->{'collections'}->{'create'}->execute('collection' => {
+ 'uuid' => $Job->{'output'},
+ 'manifest_text' => $manifest_text,
+ });
+ };
+ if ($@) {
+ Log (undef, "Failed to register output manifest: $@");
}
-
- dbh_do ("update mrjob set output=? where id=?", undef,
- $key, $job_id)
- or croak ($dbh->errstr);
-
- $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id")
- or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr);
}
-
Log (undef, "finish");
-dbh_do ("update mrjob set finishtime=now(), success=?
- where id=? and jobmanager_id=?", undef,
- $success, $job_id, $jobmanager_id)
- or croak ($dbh->errstr);
-
save_meta();
exit 0;
my ($todo, $done, $running) = (scalar @jobstep_todo,
scalar @jobstep_done,
scalar @slot - scalar @freeslot - scalar @holdslot);
- dbh_do
- ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?",
- undef,
- $todo, $done, $running, $job_id);
+ $Job->{'tasks_summary'} ||= {};
+ $Job->{'tasks_summary'}->{'todo'} = $todo;
+ $Job->{'tasks_summary'}->{'done'} = $done;
+ $Job->{'tasks_summary'}->{'running'} = $running;
+ $Job->save if $job_has_uuid;
Log (undef, "status: $done done, $running running, $todo todo");
$progress_is_dirty = 0;
}
my $elapsed = time - $proc{$pid}->{time};
my $Jobstep = $jobstep[$jobstepid];
- process_stderr_for_output_key ($jobstepid);
-
my $exitcode = $?;
my $exitinfo = "exit $exitcode";
- if (!exists $Jobstep->{output})
- {
- $exitinfo .= " with no output key";
- $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys;
- }
+ $Jobstep->{'arvados_task'}->reload;
+ my $success = $Jobstep->{'arvados_task'}->{success};
- if ($exitcode == 0 && $Jobstep->{node_fail}) {
- $exitinfo .= " but recording as failure";
- $exitcode = -1;
- }
+ Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
- Log ($jobstepid, "child $pid on $whatslot $exitinfo");
+ if (!defined $success) {
+ # task did not indicate one way or the other --> fail
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $success = 0;
+ }
- if ($exitcode != 0 || $Jobstep->{node_fail})
+ if (!$success)
{
- --$Jobstep->{attempts} if $Jobstep->{node_fail};
+ my $no_incr_attempts;
+ $no_incr_attempts = 1 if $Jobstep->{node_fail};
+
++$thisround_failed;
++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
$elapsed < 5 &&
$Jobstep->{attempts} > 1) {
Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
+ $no_incr_attempts = 1;
--$Jobstep->{attempts};
}
ban_node_by_slot($proc{$pid}->{slot});
push @jobstep_todo, $jobstepid;
Log ($jobstepid, "failure in $elapsed seconds");
+
+ --$Jobstep->{attempts} if $no_incr_attempts;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
else
{
}
$Jobstep->{exitcode} = $exitcode;
$Jobstep->{finishtime} = time;
- process_stderr ($jobstepid, $exitcode == 0);
- Log ($jobstepid, "output $$Jobstep{output}");
+ process_stderr ($jobstepid, $success);
+ Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
close $reader{$jobstepid};
delete $reader{$jobstepid};
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
+ # Load new tasks
+ my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
+ 'where' => {
+ 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence'
+ );
+ foreach my $arvados_task (@{$newtask_list->{'items'}}) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'attempts' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+
$progress_is_dirty = 1;
1;
}
my $buf;
while (0 < sysread ($reader{$job}, $buf, 8192))
{
- print STDERR $buf if $ENV{MR_DEBUG};
+ print STDERR $buf if $ENV{CRUNCH_DEBUG};
$jobstep[$job]->{stderr} .= $buf;
preprocess_stderr ($job);
- if (length ($jobstep[$job]->{stderr}) > 16384 &&
- $jobstep[$job]->{stderr} !~ /\+\+\+mr/)
+ if (length ($jobstep[$job]->{stderr}) > 16384)
{
substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
}
}
-sub process_stderr_for_output_key
-{
- my $job = shift;
- while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s)
- {
- $jobstep[$job]->{output} = $1;
- $jobsteps_must_output_keys = 1;
- }
-}
-
-
sub preprocess_stderr
{
my $job = shift;
- $jobstep[$job]->{stderr_jobsteps} = []
- if !exists $jobstep[$job]->{stderr_jobsteps};
-
- $jobstep[$job]->{stderr} =~
- s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{
- push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1);
- "";
- }gse;
-
while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
- if ($line =~ /\+\+\+mr/) {
- last;
- }
substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
Log ($job, "stderr $line");
- if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
+ if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
# whoa.
$main::please_freeze = 1;
}
map {
Log ($job, "stderr $_");
} split ("\n", $jobstep[$job]->{stderr});
-
- if (!$success || !exists $jobstep[$job]->{stderr_jobsteps})
- {
- delete $jobstep[$job]->{stderr_jobsteps};
- return;
- }
-
- foreach (@{ $jobstep[$job]->{stderr_jobsteps} })
- {
- /^(?:\/(\d+|\*))? (\d+) (.*)/s;
- my ($merge, $level, $input) = ($1, $2, $3);
- my $newjobref;
- if ($merge)
- {
- push @jobstep_tomerge, $input;
- $jobstep_tomerge_level = $level;
- if ($merge !~ /\D/ && @jobstep_tomerge >= $merge)
- {
- $newjobref = { input => join ("\n",
- splice @jobstep_tomerge, 0, $merge),
- level => $level,
- attempts => 0 };
- }
- }
- else
- {
- $newjobref = { input => $input,
- level => $level,
- attempts => 0 };
- }
- if ($newjobref)
- {
- push @jobstep, $newjobref;
- push @jobstep_todo, $#jobstep;
- }
- }
- delete $jobstep[$job]->{stderr_jobsteps};
}
sub collate_output
{
+ my $whc = Warehouse->new;
Log (undef, "collate");
$whc->write_start (1);
- my $key;
+ my $joboutput;
for (@jobstep)
{
- next if !exists $_->{output} || $_->{exitcode} != 0;
- my $output = $_->{output};
- if ($output !~ /^[0-9a-f]{32}/)
+ next if (!exists $_->{'arvados_task'}->{output} ||
+ !$_->{'arvados_task'}->{'success'} ||
+ $_->{'exitcode'} != 0);
+ my $output = $_->{'arvados_task'}->{output};
+ if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
$output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
$whc->write_data ($output);
}
elsif (@jobstep == 1)
{
- $key = $output;
+ $joboutput = $output;
$whc->write_finish;
}
elsif (defined (my $outblock = $whc->fetch_block ($output)))
$success = 0;
}
}
- $key = $whc->write_finish if !defined $key;
- if ($key)
+ $joboutput = $whc->write_finish if !defined $joboutput;
+ if ($joboutput)
{
- Log (undef, "outputkey $key");
- dbh_do ("update mrjob set output=? where id=?", undef,
- $key, $job_id)
- or Log (undef, "db update failed: ".$DBI::errstr);
+ Log (undef, "output $joboutput");
+ $Job->{'output'} = $joboutput;
+ $Job->save if $job_has_uuid;
}
else
{
- Log (undef, "outputkey undef");
+ Log (undef, "output undef");
}
- return $key;
+ return $joboutput;
}
}
-sub reconnect_database
-{
- return if !$have_database;
- return if ($dbh && $dbh->do ("select now()"));
- for (1..16)
- {
- $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
- if ($dbh) {
- $dbh->{InactiveDestroy} = 1;
- return;
- }
- warn ($DBI::errstr);
- sleep $_;
- }
- croak ($DBI::errstr) if !$dbh;
-}
-
-
-sub dbh_do
-{
- return 1 if !$have_database;
- my $ret = $dbh->do (@_);
- return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
- reconnect_database();
- return $dbh->do (@_);
-}
-
-
sub croak
{
my ($package, $file, $line) = caller;
sub cleanup
{
- return if !$have_database || !$dbh;
-
- reconnect_database();
- my $sth;
- $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
- $sth->execute ($jobmanager_id);
- $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
- $sth->execute ($job_id, $jobmanager_id);
+ return if !$job_has_uuid;
+ $Job->reload if $job_has_uuid;
+ $Job->{'running'} = 0;
+ $Job->{'success'} = 0;
+ $Job->{'finished_at'} = gmtime;
+ $Job->save if $job_has_uuid;
}
sub save_meta
{
- reconnect_database();
my $justcheckpoint = shift; # false if this will be the last meta saved
my $m = $metastream;
$m = $m->copy if $justcheckpoint;
$m->write_finish;
- my $key = $m->as_key;
+ my $loglocator = $m->as_key;
undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
- Log (undef, "meta key is $key");
- dbh_do ("update mrjob set metakey=? where id=?",
- undef,
- $key, $job_id);
+ Log (undef, "meta key is $loglocator");
+ $Job->{'log'} = $loglocator;
+ $Job->save if $job_has_uuid;
}
sub freeze
{
- Log (undef, "freeze");
-
- my $freezer = new Warehouse::Stream (whc => $whc);
- $freezer->clear;
- $freezer->name (".");
- $freezer->write_start ("state.txt");
-
- $freezer->write_data (join ("\n",
- "job $Job->{id}",
- map
- {
- $_ . "=" . freezequote($Job->{$_})
- } grep { $_ ne "id" } keys %$Job) . "\n\n");
-
- foreach my $Jobstep (@jobstep)
- {
- my $str = join ("\n",
- map
- {
- $_ . "=" . freezequote ($Jobstep->{$_})
- } grep {
- $_ !~ /^stderr|slotindex|node_fail/
- } keys %$Jobstep);
- $freezer->write_data ($str."\n\n");
- }
- if (@jobstep_tomerge)
- {
- $freezer->write_data
- ("merge $jobstep_tomerge_level "
- . freezequote (join ("\n",
- map { freezequote ($_) } @jobstep_tomerge))
- . "\n\n");
- }
-
- $freezer->write_finish;
- my $frozentokey = $freezer->as_key;
- undef $freezer;
- Log (undef, "frozento key is $frozentokey");
- dbh_do ("update mrjob set frozentokey=? where id=?", undef,
- $frozentokey, $job_id);
- my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
- Log (undef, "frozento+K key is $kfrozentokey");
- return $frozentokey;
+ Log (undef, "Freeze not implemented");
+ return;
}
sub thaw
{
+ croak ("Thaw not implemented");
+
+ my $whc;
my $key = shift;
Log (undef, "thaw from $key");
}
}
- foreach (qw (mrfunction revision inputkey knobs))
+ foreach (qw (script script_version script_parameters))
{
$Job->{$_} = $frozenjob->{$_};
}
- dbh_do
- ("update mrjob
- set mrfunction=?, revision=?, input0=?, knobs=?
- where id=?", undef,
- $Job->{mrfunction},
- $Job->{revision},
- $Job->{inputkey},
- $Job->{knobs},
- $Job->{id},
- );
+ $Job->save if $job_has_uuid;
}
map { / / ? "'$_'" : $_ }
(@$args)),
"\n")
- if $ENV{MR_DEBUG};
+ if $ENV{CRUNCH_DEBUG};
if (defined $stdin) {
my $child = open STDIN, "-|";
# Don't start any new jobsteps on this node for 60 seconds
my $slotid = shift;
$slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
+ $slot[$slotid]->{node}->{hold_count}++;
Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
}
+
+__DATA__
+#!/usr/bin/perl
+
+# checkout-and-build
+
+use Fcntl ':flock';
+
+my $destdir = $ENV{"CRUNCH_SRC"};
+my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $repo = $ENV{"CRUNCH_SRC_URL"};
+
+open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
+flock L, LOCK_EX;
+if (readlink ("$destdir.commit") eq $commit) {
+ exit 0;
+}
+
+unlink "$destdir.commit";
+open STDOUT, ">", "$destdir.log";
+open STDERR, ">&STDOUT";
+
+mkdir $destdir;
+open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
+print TARX <DATA>;
+if(!close(TARX)) {
+ die "'tar -C $destdir -xf -' exited $?: $!";
+}
+
+my $pwd;
+chomp ($pwd = `pwd`);
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
+mkdir $install_dir;
+if (!-e "./install.sh" && -e "./tests/autotests.sh") {
+ # Old version
+ shell_or_die ("./tests/autotests.sh", $install_dir);
+} elsif (-e "./install.sh") {
+ shell_or_die ("./install.sh", $install_dir);
+}
+
+if ($commit) {
+ unlink "$destdir.commit.new";
+ symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
+ rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+}
+
+close L;
+
+exit 0;
+
+sub shell_or_die
+{
+ if ($ENV{"DEBUG"}) {
+ print STDERR "@_\n";
+ }
+ system (@_) == 0
+ or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+}
+
+__DATA__