return $output_block;
}
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
sub create_output_collection
{
Log (undef, "collate");
'.execute()["portable_data_hash"]'
);
+ my $task_idx = -1;
for (@jobstep)
{
- next if (!exists $_->{'arvados_task'}->{'output'} ||
- !$_->{'arvados_task'}->{'success'});
+ ++$task_idx;
+ next unless exists $_->{'arvados_task'}->{'output'};
my $output = $_->{'arvados_task'}->{output};
if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
}
else
{
- Log (undef, "XXX fetch_block($output) failed XXX");
+ my $uuid = $_->{'arvados_task'}->{'uuid'};
+ Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
$main::success = 0;
}
}
unlink "$destdir.commit";
mkdir $destdir;
-open TARX, "|-", "tar", "-xC", $destdir;
-{
- local $/ = undef;
- print TARX <DATA>;
+
+if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+}
+# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+# get SIGPIPE. We must feed it data incrementally.
+my $tar_input;
+while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
}
if(!close(TARX)) {
die "'tar -xC $destdir' exited $?: $!";