From: Tom Clegg Date: Thu, 23 May 2013 06:47:53 +0000 (-0700) Subject: start lifting job manager over from free factories to crunch X-Git-Tag: 1.1.0~3304 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/ac91070f6dc5cc5ecf8756e75fe934b3436ec531 start lifting job manager over from free factories to crunch --- diff --git a/sdk/perl/lib/Arvados.pm b/sdk/perl/lib/Arvados.pm index 39a49dea88..18800290a7 100644 --- a/sdk/perl/lib/Arvados.pm +++ b/sdk/perl/lib/Arvados.pm @@ -5,7 +5,7 @@ Arvados -- client library for Arvados services =head1 SYNOPSIS use Arvados; - $arv = Arvados->new()->build(apiHost => 'arvados.local'); + $arv = Arvados->new(apiHost => 'arvados.local'); my $instances = $arv->{'pipeline_instances'}->{'list'}->execute(); print "UUID is ", $instances->{'items'}->[0]->{'uuid'}, "\n"; diff --git a/sdk/perl/lib/Arvados/ResourceMethod.pm b/sdk/perl/lib/Arvados/ResourceMethod.pm index 392251f48f..577e4ea47a 100644 --- a/sdk/perl/lib/Arvados/ResourceMethod.pm +++ b/sdk/perl/lib/Arvados/ResourceMethod.pm @@ -44,6 +44,9 @@ sub execute if (!exists $given_params{$param_name}->{$property_name}) { ; } + elsif ($given_params{$param_name}->{$property_name} eq undef) { + $param_value{$property_name} = JSON::null; + } elsif ($property->{'type'} eq 'boolean') { $param_value{$property_name} = $given_params{$param_name}->{$property_name} ? JSON::true : JSON::false; } diff --git a/sdk/perl/lib/Arvados/ResourceProxy.pm b/sdk/perl/lib/Arvados/ResourceProxy.pm index e6a87764a2..c81d87effb 100644 --- a/sdk/perl/lib/Arvados/ResourceProxy.pm +++ b/sdk/perl/lib/Arvados/ResourceProxy.pm @@ -21,6 +21,18 @@ sub save $self; } +sub reload +{ + my $self = shift; + $response = $self->{'resourceAccessor'}->{'get'}->execute('uuid' => $self->{'uuid'}); + foreach my $param (keys %$self) { + if (exists $response->{$param}) { + $self->{$param} = $response->{$param}; + } + } + $self; +} + sub resource_parameter_name { my $self = shift; diff --git a/services/api/app/models/job_task.rb b/services/api/app/models/job_task.rb index be0456d576..c951d439f4 100644 --- a/services/api/app/models/job_task.rb +++ b/services/api/app/models/job_task.rb @@ -3,6 +3,8 @@ class JobTask < ArvadosModel include KindAndEtag include CommonApiTemplate serialize :parameters, Hash + after_update :delete_created_job_tasks_if_failed + after_update :assign_created_job_tasks_qsequence_if_succeeded api_accessible :superuser, :extend => :common do |t| t.add :job_uuid @@ -14,4 +16,21 @@ class JobTask < ArvadosModel t.add :progress t.add :success end + + protected + + def delete_created_job_tasks_if_failed + if self.success == false and self.success != self.success_was + JobTask.destroy_all('created_by_job_task = ?', self.uuid) + end + end + + def assign_created_job_tasks_qsequence_if_succeeded + if self.success == false and self.success != self.success_was + # xxx qsequence should be sequential as advertised; for now at + # least it's non-decreasing. + JobTask.update_all(['qsequence = ?', Time.now.to_f*1000000], + ['created_by_job_task = ?', self.uuid]) + end + end end diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job index e884a449c4..861ac6c8e2 100755 --- a/services/crunch/crunch-job +++ b/services/crunch/crunch-job @@ -3,48 +3,46 @@ =head1 NAME -whjobmanager: Execute job steps, save snapshots as requested, collate output. +crunch-job: Execute job steps, save snapshots as requested, collate output. =head1 SYNOPSIS -Obtain job details from database, run tasks on compute nodes -(typically invoked by scheduler on cloud controller): +Obtain job details from Arvados, run tasks on compute nodes (typically +invoked by scheduler on controller): - whjobmanager jobid + crunch-job --uuid x-y-z Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): - whjobmanager revision=PATH mrfunction=FUNC inputkey=LOCATOR \ - [stepspernode=N] [SOMEKNOB=value] ... + crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}' =head1 RUNNING JOBS LOCALLY -whjobmanager(1p)'s log messages appear on stderr, and are saved in the -warehouse at each checkpoint and when the job finishes. +crunch-job's log messages appear on stderr along with the job tasks' +stderr streams. The log is saved in Keep at each checkpoint and when +the job finishes. If the job succeeds, the job's output locator is printed on stdout. -If a job step outputs anything to stderr, it appears in -whjobmanager(1p)'s log when the step finishes. - While the job is running, the following signals are accepted: =over =item control-C, SIGINT, SIGQUIT -Save a checkpoint, terminate any job steps that are running, and stop. +Save a checkpoint, terminate any job tasks that are running, and stop. =item SIGALRM Save a checkpoint and continue. -=back +=item SIGHUP -=head1 SEE ALSO +Refresh node allocation (i.e., check whether any nodes have been added +or unallocated). Currently this is a no-op. -whintro(1p), wh(1p) +=back =cut @@ -53,91 +51,93 @@ use strict; use DBI; use POSIX ':sys_wait_h'; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use Arvados; +use Getopt::Long; use Warehouse; use Warehouse::Stream; $ENV{"TMPDIR"} ||= "/tmp"; - -do '/etc/warehouse/warehouse-server.conf'; +$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job"; +$ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; +mkdir ($ENV{"CRUNCH_TMP"}); + +my $force_unlock; +my $jobspec; +my $resume_stash; +GetOptions('force-unlock' => \$force_unlock, + 'job=s' => \$jobspec, + 'resume-stash=s' => \$resume_stash, + ); my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; -my $have_database = @ARGV == 1 && $ARGV[0] =~ /^\d+$/; +my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/; +$SIG{'HUP'} = sub +{ + 1; +}; $SIG{'USR1'} = sub { - $main::ENV{MR_DEBUG} = 1; + $main::ENV{CRUNCH_DEBUG} = 1; }; $SIG{'USR2'} = sub { - $main::ENV{MR_DEBUG} = 0; + $main::ENV{CRUNCH_DEBUG} = 0; }; -my $whc = new Warehouse or croak ("failed to create Warehouse client"); -my $metastream = new Warehouse::Stream (whc => $whc); +my $arv = Arvados->new; +my $metastream = Warehouse::Stream->new; $metastream->clear; -$metastream->name ("."); -$metastream->write_start ("log.txt"); - - +$metastream->write_start('log.txt'); +my $User = {}; my $Job = {}; my $job_id; my $dbh; my $sth; -if ($have_database) +if ($job_has_uuid) { - ($job_id) = @ARGV; - - $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); - croak ($DBI::errstr) if !$dbh; - $dbh->{InactiveDestroy} = 1; - - $sth = $dbh->prepare ("select * from mrjob where id=?"); - $sth->execute ($job_id) or croak ($dbh->errstr); - $Job = $sth->fetchrow_hashref or croak ($sth->errstr); + $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + $User = $arv->{'users'}->{'current'}->execute; + if (!$force_unlock) { + if ($Job->{'is_locked_by'}) { + croak("Job is locked: " . $Job->{'is_locked_by'}); + } + if ($Job->{'success'} ne undef) { + croak("Job 'success' flag (" . $Job->{'success'} . ") is not null"); + } + if ($Job->{'running'}) { + croak("Job 'running' flag is already set"); + } + if ($Job->{'started_at'}) { + croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")"); + } + } } - else { - my %knob; - foreach (@ARGV) - { - if (/([a-z].*?)=(.*)/) { - $Job->{$1} = $2; - } elsif (/(.*?)=(.*)/) { - $knob{$1} = $2; - } - } - $Job->{knobs} = join ("\n", map { "$_=$knob{$_}" } sort keys %knob); + $Job = JSON::decode_json($jobspec); - if (!$Job->{thawedfromkey}) + if (!$resume_stash) { map { croak ("No $_ specified") unless $Job->{$_} } - qw(mrfunction revision inputkey); + qw(script script_version script_parameters); } - if (!defined $Job->{id}) { - chomp ($Job->{id} = sprintf ("%d.%d\@%s", time, $$, `hostname`)); + if (!defined $Job->{'uuid'}) { + chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$)); } - $job_id = $Job->{id}; } +$job_id = $Job->{'uuid'}; -$Job->{inputkey} = $Job->{input0} if !exists $Job->{inputkey}; -delete $Job->{input0}; - - - -my $max_ncpus; -map { $max_ncpus = $1 if /^STEPSPERNODE=(.*)/ } split ("\n", $$Job{knobs}); -$max_ncpus = $1 if $$Job{nodes} =~ /\&(\d+)/; -$max_ncpus = $$Job{stepspernode} if $$Job{stepspernode}; -my $maxstepspernode; - +$Job->{'resource_limits'} ||= {}; +$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0; +my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'}; Log (undef, "check slurm allocation"); @@ -157,8 +157,7 @@ if (exists $ENV{SLURM_NODELIST}) foreach (@sinfo) { my ($ncpus, $slurm_nodelist) = split; - $ncpus = $max_ncpus if defined ($max_ncpus) && $ncpus > $max_ncpus && $max_ncpus > 0; - $maxstepspernode = $ncpus if !defined $maxstepspernode || $maxstepspernode < $ncpus; + $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus; my @nodelist; while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) @@ -218,35 +217,20 @@ foreach (@sinfo) my $jobmanager_id; -if ($have_database) +if ($job_has_uuid) { # Claim this job, and make sure nobody else does - $sth = $dbh->prepare ("insert into mrjobmanager - (pid, revision, starttime) - values (?, ?, now())"); - my $rev = q/$Revision$/; - $rev =~ /\d+/; - $sth->execute ($$, +$&) or croak ($dbh->errstr); - - $sth = $dbh->prepare ("select last_insert_id()"); - $sth->execute or croak ($dbh->errstr); - ($jobmanager_id) = $sth->fetchrow_array; - - $sth = $dbh->prepare ("update mrjob set jobmanager_id=?, starttime=now() - where id=? and jobmanager_id is null"); - $sth->execute ($jobmanager_id, $job_id) or croak ($dbh->errstr); - - $sth = $dbh->prepare ("select jobmanager_id from mrjob - where id=?"); - $sth->execute ($job_id) or croak ($dbh->errstr); - my ($check_jobmanager_id) = $sth->fetchrow_array; - if ($check_jobmanager_id != $jobmanager_id) - { - # race condition - another job manager proc stole the job - Log (undef, - "job taken by jobmanager id $check_jobmanager_id"); - exit (1); + $Job->{'is_locked_by'} = $User->{'uuid'}; + $Job->{'started_at'} = time; + $Job->{'running'} = 1; + $Job->{'success'} = undef; + $Job->{'tasks_summary'} = { 'failed' => 0, + 'todo' => 1, + 'running' => 0, + 'done' => 0 }; + unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) { + croak("Error while updating / locking job"); } } @@ -264,7 +248,7 @@ $main::please_continue = 0; my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); -$ENV{"MR_JOB_ID"} = $job_id; +$ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; @@ -285,139 +269,46 @@ if (defined $Job->{thawedfromkey}) } else { - push @jobstep, { input => $Job->{inputkey}, - level => 0, + my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, + }); + push @jobstep, { level => 0, attempts => 0, + arvados_task => $first_task, }; push @jobstep_todo, 0; } - -mkdir ($ENV{"TMPDIR"}."/mrcompute"); -if ($$Job{knobs} =~ /^GPG_KEYS=(.*)/m) { - # set up a fresh gnupg directory just for this process - # TODO: reap abandoned gnupg dirs - system ("rm", "-rf", $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"); - mkdir ($ENV{"TMPDIR"}."/mrcompute"); - mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg", 0700); - mkdir ($ENV{"TMPDIR"}."/mrcompute/.gnupg/$$", 0700) || croak ("mkdir: $!"); - - my $newhomedir = $ENV{"TMPDIR"}."/mrcompute/.gnupg/$$"; - - open C, ">", $newhomedir."/gpg.conf"; - print C "always-trust\n"; - close C; - - # import secret keys referenced in job spec - my $hashes = $1; - $hashes =~ s/\'/\'\\\'\'/g; - my $gpg_out = `whget '$hashes' - | gpg --homedir "$newhomedir" --import 2>&1`; - my %encrypt_to; - while ($gpg_out =~ /^gpg: key ([0-9A-F]{8}): /gm) { - my $keynum = $1; - while (`gpg --homedir "$newhomedir" --list-keys "$keynum"` =~ /^uid\s.*<(.+?)>/gm) { - $encrypt_to{$1} = 1; - } - } - if (!%encrypt_to) { - croak ("GPG_KEYS provided but failed to import keys:\n$gpg_out"); - } - - if ($have_database) { - - # make sure the job request was signed by all of the secret keys - # contained in GPG_KEYS (otherwise, any VM can just copy the - # GPG_KEYS hash from an existing mr-job and submit new jobs that can - # read private data) - - my %did_not_sign; - my $seckeys = `gpg --homedir "$newhomedir" --list-secret-keys --with-fingerprint`; - while ($seckeys =~ /Key fingerprint.*?([0-9A-F][0-9A-F ]+[0-9A-F])/mgi) { - $did_not_sign{$1} = 1; - } - my $srfile = "$newhomedir/signedrequest"; - open SREQ, ">", $srfile; - print SREQ $$Job{"signedrequest"}; - close SREQ; - my $gpg_v = `gpg --homedir "$newhomedir" --verify --with-fingerprint "$srfile" 2>&1 && echo ok`; - unlink $srfile; - if ($gpg_v =~ /\nok\n$/s) { - while ($gpg_v =~ /Good signature.*? key fingerprint: (\S[^\n]+\S)/sgi) { - delete $did_not_sign{$1}; - } - } - if (%did_not_sign) { - croak (join ("\n", - "Some secret keys provided did not sign this job request:", - keys %did_not_sign) . "\n"); - } - } - - my $hostname = `hostname`; - chomp ($hostname); - - # tell mrjobsteps the decrypted secret key(s) and all public key(s) they might need - $ENV{"GPG_KEYS"} = `gpg --homedir "$newhomedir" --export-secret-keys --armor`; - $ENV{"GPG_PUBLIC_KEYS"} = `gpg --export --armor | ENCRYPT_TO= whput -`; - - # import all secret keys from my real home dir - `gpg --export-secret-keys | gpg --homedir "$newhomedir" --import 2>&1`; - - # use the new gnupg dir from now on - $ENV{"GNUPGHOME"} = $newhomedir; - - # if I have a secret key for root@{host} or {user}@{host} or - # {configured-controller-gpg-uid}, add that as a recipient too so - # I'll be able to read frozentokeys etc. later - my %allkeys; - while (`gpg --list-secret-keys` =~ /^uid\s.*?<(.+?)>/gm) { - $allkeys{$1} = 1; - } - my $encrypting_to_self = 0; - my @try_these_uids = ("root\@".$hostname, $ENV{"USER"}."\@".$hostname); - push @try_these_uids, ($whc->{config}->{controller_gpg_uid}) - if exists $whc->{config}->{controller_gpg_uid}; - foreach my $id (@try_these_uids) { - if (exists $allkeys{$id}) { - $encrypt_to{$id} = 1; - $encrypting_to_self = 1; - last; - } - } - - if (!$encrypting_to_self) { - croak ("Failed to find a secret key for any of [@try_these_uids] -- giving up instead of writing meta/freeze data that I won't be able to read"); - } - - # tell the client library (and child procs and jobsteps) to encrypt using these keys - $ENV{"ENCRYPT_TO"} = join (",", sort keys %encrypt_to); - Log (undef, "encrypt_to ('".$ENV{"ENCRYPT_TO"}."')"); - $whc->set_config ("encrypt_to", $ENV{"ENCRYPT_TO"}); -} - +my $build_script; +do { + local $/ = undef; + $build_script = ; +}; -$ENV{"MR_REVISION"} = $Job->{revision}; +$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision}; -my $git_build_script; my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/}); if ($skip_install) { - $ENV{"MR_REVISION_SRCDIR"} = $Job->{revision}; + $ENV{"CRUNCH_SRC"} = $Job->{revision}; } else { Log (undef, "Install revision ".$Job->{revision}); my $nodelist = join(",", @node); - # Clean out mrcompute/work and mrcompute/opt + # Clean out crunch_tmp/work and crunch_tmp/opt my $cleanpid = fork(); if ($cleanpid == 0) { - srun (["srun", "--nodelist=$nodelist", "-D", $ENV{TMPDIR}], - ['bash', '-c', 'if mount | grep -q $TMPDIR/mrcompute/work/; then sudo /bin/umount $TMPDIR/mrcompute/work/* 2>/dev/null; fi; sleep 1; rm -rf $TMPDIR/mrcompute/work $TMPDIR/mrcompute/opt']); + srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], + ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']); exit (1); } while (1) @@ -430,33 +321,34 @@ else # Install requested code version - my $build_script; my @execargs; my @srunargs = ("srun", "--nodelist=$nodelist", - "-D", $ENV{TMPDIR}, "--job-name=$job_id"); + "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); - $ENV{"MR_REVISION"} = $Job->{revision}; - $ENV{"MR_REVISION_SRCDIR"} = "$ENV{TMPDIR}/mrcompute/warehouse-apps"; - $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/opt"; + $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; + $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; + $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; my $commit; - my $treeish = $Job->{revision}; - my $repo = $Job->{git_clone_url} || $whc->get_config("git_clone_url"); + my $treeish = $Job->{'script_version'}; + my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'}; + # Todo: let script_version specify alternate repo + $ENV{"CRUNCH_SRC_URL"} = $repo; # Create/update our clone of the remote git repo - if (!-d $ENV{MR_REVISION_SRCDIR}) { - system(qw(git clone), $repo, $ENV{MR_REVISION_SRCDIR}) == 0 + if (!-d $ENV{"CRUNCH_SRC"}) { + system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0 or croak ("git clone $repo failed: exit ".($?>>8)); - system("cd $ENV{MR_REVISION_SRCDIR} && git config clean.requireForce false"); + system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false"); } - `cd $ENV{MR_REVISION_SRCDIR} && git fetch -q`; + `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`; # If this looks like a subversion r#, look for it in git-svn commit messages if ($treeish =~ m{^\d{1,4}$}) { - my $gitlog = `cd $ENV{MR_REVISION_SRCDIR} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`; + my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`; chomp $gitlog; if ($gitlog =~ /^[a-f0-9]{40}$/) { $commit = $gitlog; @@ -475,7 +367,7 @@ else $cooked_treeish = "origin/$treeish"; } - my $found = `cd $ENV{MR_REVISION_SRCDIR} && git rev-list -1 $cooked_treeish`; + my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`; chomp $found; if ($found =~ /^[0-9a-f]{40}$/s) { $commit = $found; @@ -484,39 +376,20 @@ else # frozentokey, logs, etc. -- instead of an abbreviation or a # branch name which can become ambiguous or point to a # different commit in the future. - $ENV{"MR_REVISION"} = $commit; - $Job->{revision} = $commit; - dbh_do - ("update mrjob set revision=? where id=?", - undef, - $Job->{revision}, $Job->{id}); + $ENV{"CRUNCH_SRC_COMMIT"} = $commit; Log (undef, "Using commit $commit for tree-ish $treeish"); + if ($commit ne $treeish) { + $Job->{'script_version'} = $commit; + $Job->save() or croak("Error while updating job"); + } } } } if (defined $commit) { - $ENV{"MR_GIT_COMMIT"} = $commit; - $ENV{"MR_GIT_CLONE_URL"} = $repo; - @execargs = ("sh", "-c", - "mkdir -p $ENV{TMPDIR}/mrcompute/opt && cd $ENV{TMPDIR}/mrcompute && perl - $ENV{MR_REVISION_SRCDIR} $commit $repo"); - open GBS, "<", `echo -n \$(which whjob-checkout-and-build)` - or croak ("can't find whjob-checkout-and-build"); - local $/ = undef; - $git_build_script = ; - close GBS; - $build_script = $git_build_script; - } - elsif ($treeish =~ m{^(\d{1,5})$}) { - # Want a subversion r# but couldn't find it in git-svn history - - # might as well try using the subversion repo in case it's still - # there. - $ENV{"INSTALL_REPOS"} = $whc->get_config("svn_root"); - $ENV{"INSTALL_REVISION"} = $Job->{revision}; - $ENV{"MR_REVISION_INSTALLDIR"} = "$ENV{TMPDIR}/mrcompute/revision/$treeish"; - $ENV{"MR_REVISION_SRCDIR"} = "$ENV{MR_REVISION_INSTALLDIR}/src"; + $ENV{"CRUNCH_SRC_COMMIT"} = $commit; @execargs = ("sh", "-c", - "mkdir -p $ENV{TMPDIR}/mrcompute/revision && cd $ENV{TMPDIR}/mrcompute && ( [ -e $ENV{MR_REVISION_INSTALLDIR}/.tested ] || ( svn export --quiet \"\$INSTALL_REPOS/installrevision\" && INSTALLREVISION_NOLOCK=1 ./installrevision ) )"); + "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); } else { croak ("could not figure out commit id for $treeish"); @@ -539,7 +412,7 @@ else -foreach (qw (mrfunction revision nodes stepspernode inputkey)) +foreach (qw (script script_version script_parameters resource_limits)) { Log (undef, $_ . " " . $Job->{$_}); } @@ -571,7 +444,6 @@ my %proc; my @freeslot = (0..$#slot); my @holdslot; my %reader; -my ($id, $input, $attempts); my $progress_is_dirty = 1; my $progress_stats_updated = 0; @@ -624,69 +496,56 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) undef $dbh; undef $sth; - delete $ENV{"GNUPGHOME"}; - $ENV{"MR_ID"} = $id; - $ENV{"MR_INPUT"} = $Jobstep->{input}; - $ENV{"MR_KNOBS"} = $Job->{knobs}; - $ENV{"MR_LEVEL"} = $level; - $ENV{"MR_FUNCTION"} = $Job->{mrfunction}; - $ENV{"MR_INPUT0"} = $Job->{inputkey}; - $ENV{"MR_INPUTKEY"} = $Job->{inputkey}; - $ENV{"MR_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; - $ENV{"MR_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"MR_SLOT"} = $slot[$childslot]->{cpu}; # deprecated - $ENV{"MR_JOB_TMP"} = $ENV{"TMPDIR"}."/job/work"; - $ENV{"MR_JOBSTEP_TMP"} = $ENV{"TMPDIR"}."/job/work/".$slot[$childslot]->{cpu}; - $ENV{"MR_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; - $ENV{"MOGILEFS_TRACKERS"} = join (",", @main::mogilefs_trackers); - $ENV{"MOGILEFS_DOMAIN"} = $main::mogilefs_default_domain; - $ENV{"MOGILEFS_CLASS"} = $main::mogilefs_default_class; - - $ENV{"TASK_UUID"} = $ENV{"JOB_UUID"} . "-" . $id; + $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'}; $ENV{"TASK_QSEQUENCE"} = $id; - $ENV{"TASK_SEQUENCE"} = $Jobstep->{level}; + $ENV{"TASK_SEQUENCE"} = $level; + $ENV{"JOB_SCRIPT"} = $Job->{script}; + while (my ($param, $value) = each %{$Job->{script_parameters}}) { + $param =~ tr/a-z/A-Z/; + $ENV{"JOB_PARAMETER_$param"} = $value; + } + $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; + $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; + $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu}; + $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; $ENV{"GZIP"} = "-n"; my @srunargs = ( "srun", "--nodelist=".$childnode->{name}, - qw(-n1 -c1 -N1 -D), $ENV{TMPDIR}, + qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); my @execargs = qw(sh); - my $script = ""; + my $build_script_to_send = ""; my $command = - "mkdir -p $ENV{TMPDIR}/mrcompute/revision " - ."&& cd $ENV{TMPDIR}/mrcompute "; - if ($git_build_script) + "mkdir -p $ENV{CRUNCH_TMP}/revision " + ."&& cd $ENV{CRUNCH_TMP} "; + if ($build_script) { - $script = $git_build_script; + $build_script_to_send = $build_script; $command .= - "&& perl - $ENV{MR_REVISION_SRCDIR} $ENV{MR_GIT_COMMIT} $ENV{MR_GIT_CLONE_URL}"; + "&& perl -"; } elsif (!$skip_install) { $command .= "&& " ."( " - ." [ -e '$ENV{MR_REVISION_INSTALLDIR}/.tested' ] " + ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] " ."|| " ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' " ." && ./installrevision " ." ) " .") "; } - if (exists $ENV{GPG_KEYS}) { - $command .= - "&& mkdir -p '$ENV{MR_JOBSTEP_TMP}' && (sudo /bin/umount '$ENV{MR_JOBSTEP_TMP}' 2>/dev/null || true) && rm -rf '$ENV{MR_JOBSTEP_TMP}' && exec $ENV{MR_REVISION_SRCDIR}/mapreduce/ecryptfs-wrapper -d '$ENV{MR_JOBSTEP_TMP}' -p $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; - } else { - $command .= - "&& exec $ENV{MR_REVISION_SRCDIR}/mapreduce/mrtaskmanager"; - } + $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack + $command .= + "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; my @execargs = ('bash', '-c', $command); - srun (\@srunargs, \@execargs, undef, $script); + srun (\@srunargs, \@execargs, undef, $build_script_to_send); exit (1); } close("writer"); @@ -711,7 +570,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $Jobstep->{node} = $childnode->{name}; $Jobstep->{slotindex} = $childslot; delete $Jobstep->{stderr}; - delete $Jobstep->{output}; delete $Jobstep->{finishtime}; splice @jobstep_todo, $todo_ptr, 1; @@ -800,15 +658,6 @@ update_progress_stats(); freeze_if_want_freeze(); -if (@jobstep_tomerge && !@jobstep_todo) -{ - push @jobstep, { input => join ("\n", splice @jobstep_tomerge, 0), - level => $jobstep_tomerge_level, - attempts => 0 }; - push @jobstep_todo, $#jobstep; -} - - if (!defined $success) { if (@jobstep_todo && @@ -830,48 +679,23 @@ goto ONELEVEL if !defined $success; release_allocation(); freeze(); -my $key = &collate_output(); -$success = 0 if !$key; - +$Job->{'output'} = &collate_output(); +$Job->{'success'} = 0 if !$Job->{'output'}; +$Job->save; -if ($key) +if ($Job->{'output'}) { - my @keepkey; - foreach my $hash (split ",", $key) - { - my $keephash = $whc->store_in_keep (hash => $hash, - nnodes => 3); - if (!$keephash) - { - Log (undef, "store_in_keep (\"$hash\") failed: ".$whc->errstr); - $keephash = $hash; - } - push @keepkey, $keephash; - } - my $keepkey = join (",", @keepkey); - Log (undef, "outputkey+K $keepkey"); - print "$keepkey\n" if $success; - - if ($output_in_keep) - { - $key = $keepkey; - } - - dbh_do ("update mrjob set output=? where id=?", undef, - $key, $job_id) - or croak ($dbh->errstr); - - $whc->store_manifest_by_name ($keepkey, undef, "/job$job_id") - or Log (undef, "store_manifest_by_name (\"$key\", \"/job$job_id\") failed: ".$whc->errstr); + $arv->{'collections'}->{'create'}->execute('collection' => { + 'uuid' => $Job->{'output'}, + 'manifest_text' => system("whget", $Job->{arvados_task}->{output}), + });; } Log (undef, "finish"); -dbh_do ("update mrjob set finishtime=now(), success=? - where id=? and jobmanager_id=?", undef, - $success, $job_id, $jobmanager_id) - or croak ($dbh->errstr); +$Job->{'success'} = $Job->{'output'} && $success; +$Job->save; save_meta(); exit 0; @@ -885,10 +709,11 @@ sub update_progress_stats my ($todo, $done, $running) = (scalar @jobstep_todo, scalar @jobstep_done, scalar @slot - scalar @freeslot - scalar @holdslot); - dbh_do - ("update mrjob set steps_todo=?,steps_done=?,steps_running=? where id=?", - undef, - $todo, $done, $running, $job_id); + $Job->{'tasks_summary'} ||= {}; + $Job->{'tasks_summary'}->{'todo'} = $todo; + $Job->{'tasks_summary'}->{'done'} = $done; + $Job->{'tasks_summary'}->{'running'} = $running; + $Job->save; Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } @@ -907,24 +732,21 @@ sub reapchildren my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepid]; - process_stderr_for_output_key ($jobstepid); - my $exitcode = $?; my $exitinfo = "exit $exitcode"; - if (!exists $Jobstep->{output}) - { - $exitinfo .= " with no output key"; - $exitcode = -1 if $exitcode == 0 && $jobsteps_must_output_keys; - } + $Jobstep->{arvados_task}->reload; + my $success = $Jobstep->{arvados_task}->{success}; - if ($exitcode == 0 && $Jobstep->{node_fail}) { - $exitinfo .= " but recording as failure"; - $exitcode = -1; + if (!defined $success) { + # task did not indicate one way or the other --> fail + $Jobstep->{arvados_task}->{success} = 0; + $Jobstep->{arvados_task}->save; + $success = 0; } - Log ($jobstepid, "child $pid on $whatslot $exitinfo"); + Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success"); - if ($exitcode != 0 || $Jobstep->{node_fail}) + if (!$success) { --$Jobstep->{attempts} if $Jobstep->{node_fail}; ++$thisround_failed; @@ -946,6 +768,7 @@ sub reapchildren push @jobstep_todo, $jobstepid; Log ($jobstepid, "failure in $elapsed seconds"); + $Job->{'tasks_summary'}->{'failed'}++; } else { @@ -957,8 +780,8 @@ sub reapchildren } $Jobstep->{exitcode} = $exitcode; $Jobstep->{finishtime} = time; - process_stderr ($jobstepid, $exitcode == 0); - Log ($jobstepid, "output $$Jobstep{output}"); + process_stderr ($jobstepid, $success); + Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output}); close $reader{$jobstepid}; delete $reader{$jobstepid}; @@ -966,6 +789,20 @@ sub reapchildren push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; + # Load new tasks + my $newtask_list = $arv->{'job_tasks'}->{'index'}->execute('where' => { + 'created_by_job_task' => $Jobstep->{arvados_task}->{uuid} + }); + foreach my $arvados_task (@{$newtask_list->{'items'}}) { + my $jobstep = { + 'level' => $arvados_task->{'sequence'}, + 'attempts' => 0, + 'arvados_task' => $arvados_task + }; + push @jobstep, $jobstep; + push @jobstep_todo, $#jobstep; + } + $progress_is_dirty = 1; 1; } @@ -1057,11 +894,10 @@ sub readfrompipes my $buf; while (0 < sysread ($reader{$job}, $buf, 8192)) { - print STDERR $buf if $ENV{MR_DEBUG}; + print STDERR $buf if $ENV{CRUNCH_DEBUG}; $jobstep[$job]->{stderr} .= $buf; preprocess_stderr ($job); - if (length ($jobstep[$job]->{stderr}) > 16384 && - $jobstep[$job]->{stderr} !~ /\+\+\+mr/) + if (length ($jobstep[$job]->{stderr}) > 16384) { substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; } @@ -1072,30 +908,10 @@ sub readfrompipes } -sub process_stderr_for_output_key -{ - my $job = shift; - while ($jobstep[$job]->{stderr} =~ s/\+\+\+mrout (.*?)\+\+\+\n//s) - { - $jobstep[$job]->{output} = $1; - $jobsteps_must_output_keys = 1; - } -} - - sub preprocess_stderr { my $job = shift; - $jobstep[$job]->{stderr_jobsteps} = [] - if !exists $jobstep[$job]->{stderr_jobsteps}; - - $jobstep[$job]->{stderr} =~ - s{\+\+\+mrjobstep((\/(\d+|\*))? (\d+) (.*?))\+\+\+\n}{ - push (@{ $jobstep[$job]->{stderr_jobsteps} }, $1); - ""; - }gse; - while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { my $line = $1; if ($line =~ /\+\+\+mr/) { @@ -1124,55 +940,19 @@ sub process_stderr map { Log ($job, "stderr $_"); } split ("\n", $jobstep[$job]->{stderr}); - - if (!$success || !exists $jobstep[$job]->{stderr_jobsteps}) - { - delete $jobstep[$job]->{stderr_jobsteps}; - return; - } - - foreach (@{ $jobstep[$job]->{stderr_jobsteps} }) - { - /^(?:\/(\d+|\*))? (\d+) (.*)/s; - my ($merge, $level, $input) = ($1, $2, $3); - my $newjobref; - if ($merge) - { - push @jobstep_tomerge, $input; - $jobstep_tomerge_level = $level; - if ($merge !~ /\D/ && @jobstep_tomerge >= $merge) - { - $newjobref = { input => join ("\n", - splice @jobstep_tomerge, 0, $merge), - level => $level, - attempts => 0 }; - } - } - else - { - $newjobref = { input => $input, - level => $level, - attempts => 0 }; - } - if ($newjobref) - { - push @jobstep, $newjobref; - push @jobstep_todo, $#jobstep; - } - } - delete $jobstep[$job]->{stderr_jobsteps}; } sub collate_output { + my $whc = Warehouse->new; Log (undef, "collate"); $whc->write_start (1); - my $key; + my $joboutput; for (@jobstep) { - next if !exists $_->{output} || $_->{exitcode} != 0; - my $output = $_->{output}; + next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0; + my $output = $_->{arvados_task}->{output}; if ($output !~ /^[0-9a-f]{32}/) { $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; @@ -1180,7 +960,7 @@ sub collate_output } elsif (@jobstep == 1) { - $key = $output; + $joboutput = $output; $whc->write_finish; } elsif (defined (my $outblock = $whc->fetch_block ($output))) @@ -1195,19 +975,18 @@ sub collate_output $success = 0; } } - $key = $whc->write_finish if !defined $key; - if ($key) + $joboutput = $whc->write_finish if !defined $joboutput; + if ($joboutput) { - Log (undef, "outputkey $key"); - dbh_do ("update mrjob set output=? where id=?", undef, - $key, $job_id) - or Log (undef, "db update failed: ".$DBI::errstr); + Log (undef, "outputkey $joboutput"); + $Job->{'output'} = $joboutput; + $Job->save; } else { Log (undef, "outputkey undef"); } - return $key; + return $joboutput; } @@ -1279,7 +1058,7 @@ sub Log # ($jobstep_id, $logmessage) sub reconnect_database { - return if !$have_database; + return if !$job_has_uuid; return if ($dbh && $dbh->do ("select now()")); for (1..16) { @@ -1297,7 +1076,7 @@ sub reconnect_database sub dbh_do { - return 1 if !$have_database; + return 1 if !$job_has_uuid; my $ret = $dbh->do (@_); return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/); reconnect_database(); @@ -1320,7 +1099,7 @@ sub croak sub cleanup { - return if !$have_database || !$dbh; + return if !$job_has_uuid || !$dbh; reconnect_database(); my $sth; @@ -1333,17 +1112,15 @@ sub cleanup sub save_meta { - reconnect_database(); my $justcheckpoint = shift; # false if this will be the last meta saved my $m = $metastream; $m = $m->copy if $justcheckpoint; $m->write_finish; - my $key = $m->as_key; + my $loglocator = $m->as_key; undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it - Log (undef, "meta key is $key"); - dbh_do ("update mrjob set metakey=? where id=?", - undef, - $key, $job_id); + Log (undef, "meta key is $loglocator"); + $Job->{'log'} = $loglocator; + $Job->save; } @@ -1378,6 +1155,10 @@ sub freeze_if_want_freeze sub freeze { + Log (undef, "Freeze not implemented"); + return; + + my $whc; # todo Log (undef, "freeze"); my $freezer = new Warehouse::Stream (whc => $whc); @@ -1386,7 +1167,7 @@ sub freeze $freezer->write_start ("state.txt"); $freezer->write_data (join ("\n", - "job $Job->{id}", + "job $Job->{uuid}", map { $_ . "=" . freezequote($Job->{$_}) @@ -1426,6 +1207,9 @@ sub freeze sub thaw { + croak ("Thaw not implemented"); + + my $whc; my $key = shift; Log (undef, "thaw from $key"); @@ -1478,20 +1262,11 @@ sub thaw } } - foreach (qw (mrfunction revision inputkey knobs)) + foreach (qw (script script_version script_parameters)) { $Job->{$_} = $frozenjob->{$_}; } - dbh_do - ("update mrjob - set mrfunction=?, revision=?, input0=?, knobs=? - where id=?", undef, - $Job->{mrfunction}, - $Job->{revision}, - $Job->{inputkey}, - $Job->{knobs}, - $Job->{id}, - ); + $Job->save; } @@ -1523,7 +1298,7 @@ sub srun map { / / ? "'$_'" : $_ } (@$args)), "\n") - if $ENV{MR_DEBUG}; + if $ENV{CRUNCH_DEBUG}; if (defined $stdin) { my $child = open STDIN, "-|"; @@ -1549,3 +1324,78 @@ sub ban_node_by_slot { $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); } + +__DATA__ +#!/usr/bin/perl + +# checkout-and-build + +use Fcntl ':flock'; + +my $destdir = $ENV{"CRUNCH_SRC"}; +my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; +my $repo = $ENV{"CRUNCH_SRC_URL"}; + +open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; +flock L, LOCK_EX; +if (readlink ("$destdir.commit") eq $commit) { + exit 0; +} + +open STDOUT, ">", "$destdir.log"; +open STDERR, ">&STDOUT"; + +if (-d "$destdir/.git") { + chdir $destdir or die "chdir $destdir: $!"; + if (0 != system (qw(git remote set-url origin), $repo)) { + # awful... for old versions of git that don't know "remote set-url" + shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config)); + } +} +elsif ($repo && $commit) +{ + shell_or_die('git', 'clone', $repo, $destdir); + chdir $destdir or die "chdir $destdir: $!"; + shell_or_die(qw(git config clean.requireForce false)); +} +else { + die "$destdir does not exist, and no repo/commit specified -- giving up"; +} + +if ($commit) { + unlink "$destdir.commit"; + shell_or_die (qw(git stash)); + shell_or_die (qw(git clean -d -x)); + shell_or_die (qw(git fetch origin)); + shell_or_die (qw(git checkout), $commit); +} + +my $pwd; +chomp ($pwd = `pwd`); +my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; +mkdir $install_dir; +if (!-e "./install.sh" && -e "./tests/autotests.sh") { + # Old version + shell_or_die ("./tests/autotests.sh", $install_dir); +} elsif (-e "./install.sh") { + shell_or_die ("./install.sh", $install_dir); +} + +if ($commit) { + unlink "$destdir.commit.new"; + symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; + rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; +} + +close L; + +exit 0; + +sub shell_or_die +{ + if ($ENV{"DEBUG"}) { + print STDERR "@_\n"; + } + system (@_) == 0 + or die "@_ failed: $! exit 0x".sprintf("%x",$?); +}