h2. Create a new script
-Change to your git directory and create a new script in "crunch_scripts/".
+Change to your git directory and create a new script in @crunch_scripts/@.
<notextile>
<pre><code>~$ <span class="userinput">cd <b>you</b>/crunch_scripts</span>
h2. Using arv-crunch-job to run the job in your VM
-Instead of a git commit hash, we provide the path to the directory in the "script_version" parameter. The script specified in "script" will actually be searched for in the "crunch_scripts/" subdirectory of the directory specified "script_version". Although we are running the script locally, the script still requires access to the Arvados API server and Keep storage service. The job will be recorded in the Arvados job history, and visible in Workbench.
+Instead of a git commit hash, we provide the path to the directory in the "script_version" parameter. The script specified in "script" will actually be searched for in the @crunch_scripts/@ subdirectory of the directory specified "script_version". Although we are running the script locally, the script still requires access to the Arvados API server and Keep storage service. The job will be recorded in the Arvados job history, and visible in Workbench.
<notextile>
<pre><code>~/<b>you</b>/crunch_scripts$ <span class="userinput">cat >~/the_job <<EOF
{
+ "repository":"",
"script":"hello-world.py",
- "script_version":"/home/<b>you</b>/<b>you</b>",
+ "script_version":"$HOME/$USER",
"script_parameters":{}
}
EOF</span>
-~/<b>you</b>/crunch_scripts</span>$ <span class="userinput">arv-crunch-job --job "$(cat ~/the_job)"</span>
+</code></pre>
+</notextile>
+
+Your shell should fill in values for @$HOME@ and @$USER@ so that the saved JSON points "script_version" at the directory with your checkout. Now you can run that job:
+
+<notextile>
+<pre><code>~/<b>you</b>/crunch_scripts</span>$ <span class="userinput">arv-crunch-job --job "$(cat ~/the_job)"</span>
2013-12-12_21:36:42 qr1hi-8i9sb-okzukfzkpbrnhst 29827 check slurm allocation
2013-12-12_21:36:42 qr1hi-8i9sb-okzukfzkpbrnhst 29827 node localhost - 1 slots
2013-12-12_21:36:42 qr1hi-8i9sb-okzukfzkpbrnhst 29827 start
2013-12-12_21:36:42 qr1hi-8i9sb-okzukfzkpbrnhst 29827 0 stderr hello world
2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 0 child 29834 on localhost.1 exit 0 signal 0 success=
2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 0 failure (#1, permanent) after 0 seconds
-2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 0 output
+2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 0 output
2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 Every node has failed -- giving up on this round
2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 wait for last 0 children to finish
2013-12-12_21:36:43 qr1hi-8i9sb-okzukfzkpbrnhst 29827 status: 0 done, 0 running, 0 todo
~/<b>you</b>/crunch_scripts$ <span class="userinput">chmod +x hello-world-fixed.py</span>
~/<b>you</b>/crunch_scripts$ <span class="userinput">cat >~/the_job <<EOF
{
+ "repository":"",
"script":"hello-world-fixed.py",
- "script_version":"/home/<b>you</b>/<b>you</b>",
+ "script_version":"$HOME/$USER",
"script_parameters":{}
}
EOF</span>
2013-12-12_21:56:59 qr1hi-8i9sb-79260ykfew5trzl 31578 check slurm allocation
2013-12-12_21:56:59 qr1hi-8i9sb-79260ykfew5trzl 31578 node localhost - 1 slots
2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 start
- 2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 script hello-world.py
+ 2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 script hello-world-fixed.py
2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 script_version /home/<b>you</b>/<b>you</b>
2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 script_parameters {}
2013-12-12_21:57:00 qr1hi-8i9sb-79260ykfew5trzl 31578 runtime_constraints {"max_tasks_per_node":0}
2013-12-12_21:57:02 qr1hi-8i9sb-79260ykfew5trzl 31578 Freeze not implemented
2013-12-12_21:57:02 qr1hi-8i9sb-79260ykfew5trzl 31578 collate
2013-12-12_21:57:02 qr1hi-8i9sb-79260ykfew5trzl 31578 output 576c44d762ba241b0a674aa43152b52a+53
+ WARNING:root:API lookup failed for collection 576c44d762ba241b0a674aa43152b52a+53 (<class 'apiclient.errors.HttpError'>: <HttpError 404 when requesting https://qr1hi.arvadosapi.com/arvados/v1/collections/576c44d762ba241b0a674aa43152b52a%2B53?alt=json returned "Not Found">)
2013-12-12_21:57:03 qr1hi-8i9sb-79260ykfew5trzl 31578 finish
- 2013-12-12_21:57:04 qr1hi-8i9sb-79260ykfew5trzl 31578 meta key is 9f937693334d0c9234ccc1f808ee7117+1761
</code></pre>
</notextile>
+ (The WARNING issued near the end of the script may be safely ignored here; it is the Arvados SDK letting you know that it could not find a collection named @576c44d762ba241b0a674aa43152b52a+53@ and that it is going to try looking up a block by that name instead.)
+
The job succeeded, with output in Keep object @576c44d762ba241b0a674aa43152b52a+53@. Let's look at our output:
<notextile>
#!/usr/bin/perl
-# -*- mode: perl; perl-indent-level: 2; -*-
+# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
=head1 NAME
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
use Getopt::Long;
- use Warehouse;
- use Warehouse::Stream;
- use IPC::System::Simple qw(capturex);
+ use IPC::Open2;
+ use IO::Select;
+ use File::Temp;
use Fcntl ':flock';
$ENV{"TMPDIR"} ||= "/tmp";
}
$job_id = $Job->{'uuid'};
- $metastream = Warehouse::Stream->new(whc => new Warehouse);
- $metastream->clear;
- $metastream->name('.');
- $metastream->write_start($job_id . '.log.txt');
-
+ my $keep_logfile = $job_id . '.log.txt';
+ my $local_logfile = File::Temp->new();
$Job->{'runtime_constraints'} ||= {};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
or croak ("git clone $repo failed: exit ".($?>>8));
system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
}
- `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
+ `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
# If this looks like a subversion r#, look for it in git-svn commit messages
$ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
$ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
$ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
- $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
+ $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
$ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
if ($Job->{'output'})
{
eval {
- my $manifest_text = capturex("whget", $Job->{'output'});
+ my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
$arv->{'collections'}->{'create'}->execute('collection' => {
'uuid' => $Job->{'output'},
'manifest_text' => $manifest_text,
});
+ if ($Job->{'output_is_persistent'}) {
+ $arv->{'links'}->{'create'}->execute('link' => {
+ 'tail_kind' => 'arvados#user',
+ 'tail_uuid' => $User->{'uuid'},
+ 'head_kind' => 'arvados#collection',
+ 'head_uuid' => $Job->{'output'},
+ 'link_class' => 'resources',
+ 'name' => 'wants',
+ });
+ }
};
if ($@) {
Log (undef, "Failed to register output manifest: $@");
} split ("\n", $jobstep[$job]->{stderr});
}
+ sub fetch_block
+ {
+ my $hash = shift;
+ my ($child_out, $output_block);
+
+ my $cmd = "arv keep get \Q$hash\E";
+ open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
+ sysread($keep, $output_block, 64 * 1024 * 1024);
+ close $keep;
+ return $output_block;
+ }
sub collate_output
{
- my $whc = Warehouse->new;
Log (undef, "collate");
- $whc->write_start (1);
+
+ my ($child_out, $child_in);
+ my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
my $joboutput;
for (@jobstep)
{
if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
$output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
- $whc->write_data ($output);
+ print $child_in $output;
}
elsif (@jobstep == 1)
{
$joboutput = $output;
- $whc->write_finish;
+ last;
}
- elsif (defined (my $outblock = $whc->fetch_block ($output)))
+ elsif (defined (my $outblock = fetch_block ($output)))
{
$output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
- $whc->write_data ($outblock);
+ print $child_in $outblock;
}
else
{
- my $errstr = $whc->errstr;
- $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
+ Log (undef, "XXX fetch_block($output) failed XXX");
$main::success = 0;
}
}
- $joboutput = $whc->write_finish if !defined $joboutput;
+ $child_in->close;
+
+ if (!defined $joboutput) {
+ my $s = IO::Select->new($child_out);
+ if ($s->can_read(120)) {
+ sysread($child_out, $joboutput, 64 * 1024 * 1024);
+ } else {
+ Log (undef, "timed out reading from 'arv keep put'");
+ }
+ }
+ waitpid($pid, 0);
+
if ($joboutput)
{
Log (undef, "output $joboutput");
}
print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
- return if !$metastream;
- $metastream->write_data ($datetime . " " . $message);
+ if ($metastream) {
+ print $metastream $datetime . " " . $message;
+ }
}
sub save_meta
{
my $justcheckpoint = shift; # false if this will be the last meta saved
- my $m = $metastream;
- $m = $m->copy if $justcheckpoint;
- $m->write_finish;
- my $whc = Warehouse->new;
- my $loglocator = $whc->store_block ($m->as_string);
- $arv->{'collections'}->{'create'}->execute('collection' => {
- 'uuid' => $loglocator,
- 'manifest_text' => $m->as_string,
- });
- undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
+ return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
+
+ $local_logfile->flush;
+ my $cmd = "arv keep put --filename \Q$keep_logfile\E "
+ . quotemeta($local_logfile->filename);
+ my $loglocator = `$cmd`;
+ die "system $cmd failed: $?" if $?;
+
+ $local_logfile = undef; # the temp file is automatically deleted
Log (undef, "log manifest is $loglocator");
$Job->{'log'} = $loglocator;
$Job->update_attributes('log', $loglocator) if $job_has_uuid;
sub thaw
{
croak ("Thaw not implemented");
-
- my $whc;
- my $key = shift;
- Log (undef, "thaw from $key");
-
- @jobstep = ();
- @jobstep_done = ();
- @jobstep_todo = ();
- @jobstep_tomerge = ();
- $jobstep_tomerge_level = 0;
- my $frozenjob = {};
-
- my $stream = new Warehouse::Stream ( whc => $whc,
- hash => [split (",", $key)] );
- $stream->rewind;
- while (my $dataref = $stream->read_until (undef, "\n\n"))
- {
- if ($$dataref =~ /^job /)
- {
- foreach (split ("\n", $$dataref))
- {
- my ($k, $v) = split ("=", $_, 2);
- $frozenjob->{$k} = freezeunquote ($v);
- }
- next;
- }
-
- if ($$dataref =~ /^merge (\d+) (.*)/)
- {
- $jobstep_tomerge_level = $1;
- @jobstep_tomerge
- = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
- next;
- }
-
- my $Jobstep = { };
- foreach (split ("\n", $$dataref))
- {
- my ($k, $v) = split ("=", $_, 2);
- $Jobstep->{$k} = freezeunquote ($v) if $k;
- }
- $Jobstep->{'failures'} = 0;
- push @jobstep, $Jobstep;
-
- if ($Jobstep->{exitcode} eq "0")
- {
- push @jobstep_done, $#jobstep;
- }
- else
- {
- push @jobstep_todo, $#jobstep;
- }
- }
-
- foreach (qw (script script_version script_parameters))
- {
- $Job->{$_} = $frozenjob->{$_};
- }
- $Job->save if $job_has_uuid;
}