Log (undef, "Failed to write output collection");
}
else {
- Log(undef, "output hash " . $collated_output);
+ Log(undef, "job output $collated_output");
$Job->update_attributes('output' => $collated_output);
}
if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
# Give up on this task, and the whole job
$main::success = 0;
- $main::please_freeze = 1;
}
# Put this task back on the todo queue
push @jobstep_todo, $jobstepid;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
$Jobstep->{'arvados_task'}->save;
process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+ Log ($jobstepid, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
close $reader{$jobstepid};
delete $reader{$jobstepid};
sub fetch_block
{
my $hash = shift;
- my ($keep, $child_out, $output_block);
-
- my $cmd = "arv-get \Q$hash\E";
- open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
- $output_block = '';
+ my $keep;
+ if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+ Log(undef, "fetch_block run error from arv-get $hash: $!");
+ return undef;
+ }
+ my $output_block = "";
while (1) {
my $buf;
my $bytes = sysread($keep, $buf, 1024 * 1024);
if (!defined $bytes) {
- die "reading from arv-get: $!";
+ Log(undef, "fetch_block read error from arv-get: $!");
+ $output_block = undef;
+ last;
} elsif ($bytes == 0) {
# sysread returns 0 at the end of the pipe.
last;
}
}
close $keep;
+ if ($?) {
+ Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+ $output_block = undef;
+ }
return $output_block;
}
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
sub create_output_collection
{
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'python', '-c',
- 'import arvados; ' .
- 'import sys; ' .
- 'print arvados.api()' .
- '.collections()' .
- '.create(body={"manifest_text":sys.stdin.read()})' .
- '.execute()["portable_data_hash"]'
- );
-
+ my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+ create(body={"manifest_text": sys.stdin.read()}).
+ execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
+
+ my $task_idx = -1;
+ my $manifest_size = 0;
for (@jobstep)
{
- next if (!exists $_->{'arvados_task'}->{'output'} ||
- !$_->{'arvados_task'}->{'success'});
+ ++$task_idx;
my $output = $_->{'arvados_task'}->{output};
- if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
- {
- print $child_in $output;
- }
- elsif (defined (my $outblock = fetch_block ($output)))
- {
- print $child_in $outblock;
+ next if (!defined($output));
+ my $next_write;
+ if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+ $next_write = fetch_block($output);
+ } else {
+ $next_write = $output;
}
- else
- {
- Log (undef, "XXX fetch_block($output) failed XXX");
+ if (defined($next_write)) {
+ if (!defined(syswrite($child_in, $next_write))) {
+ # There's been an error writing. Stop the loop.
+ # We'll log details about the exit code later.
+ last;
+ } else {
+ $manifest_size += length($next_write);
+ }
+ } else {
+ my $uuid = $_->{'arvados_task'}->{'uuid'};
+ Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
$main::success = 0;
}
}
- $child_in->close;
+ close($child_in);
+ Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
my $joboutput;
my $s = IO::Select->new($child_out);
if ($s->can_read(120)) {
- sysread($child_out, $joboutput, 64 * 1024 * 1024);
- chomp($joboutput);
- # TODO: Ensure exit status == 0.
+ sysread($child_out, $joboutput, 1024 * 1024);
+ waitpid($pid, 0);
+ if ($?) {
+ Log(undef, "output collection creation exited " . exit_status_s($?));
+ $joboutput = undef;
+ } else {
+ chomp($joboutput);
+ }
} else {
Log (undef, "timed out while creating output collection");
+ foreach my $signal (2, 2, 2, 15, 15, 9) {
+ kill($signal, $pid);
+ last if waitpid($pid, WNOHANG) == -1;
+ sleep(1);
+ }
}
- # TODO: kill $pid instead of waiting, now that we've decided to
- # ignore further output.
- waitpid($pid, 0);
+ close($child_out);
return $joboutput;
}
{
my $logfilename = shift;
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
- 'arv-put', '--portable-data-hash',
+ 'arv-put',
+ '--portable-data-hash',
+ '--project-uuid', $Job->{owner_uuid},
'--retries', '3',
+ '--name', $logfilename,
'--filename', $logfilename,
'-');
}