my $jobstep_tomerge_level = 0;
my $squeue_checked;
my $squeue_kill_checked;
-my $output_in_keep = 0;
my $latest_refresh = scalar time;
{
$main::please_info = 0;
freeze();
- collate_output();
+ create_output_collection();
save_meta(1);
update_progress_stats();
}
$main::please_continue = 0;
goto THISROUND;
}
- $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+ $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
readfrompipes ();
if (!reapchildren())
{
release_allocation();
freeze();
-my $collated_output = &collate_output();
+my $collated_output = &create_output_collection();
if (!$collated_output) {
- Log(undef, "output undef");
+ Log (undef, "Failed to write output collection");
}
else {
- eval {
- open(my $orig_manifest, '-|', 'arv-get', $collated_output)
- or die "failed to get collated manifest: $!";
- my $orig_manifest_text = '';
- while (my $manifest_line = <$orig_manifest>) {
- $orig_manifest_text .= $manifest_line;
- }
- my $output = api_call("collections/create", collection => {
- 'manifest_text' => $orig_manifest_text});
- Log(undef, "output uuid " . $output->{uuid});
- Log(undef, "output hash " . $output->{portable_data_hash});
- $Job->update_attributes('output' => $output->{portable_data_hash});
- };
- if ($@) {
- Log (undef, "Failed to register output manifest: $@");
- }
+ Log(undef, "output hash " . $collated_output);
+ $Job->update_attributes('output' => $collated_output);
}
Log (undef, "finish");
return $output_block;
}
-sub collate_output
+# create_output_collections generates a new collection containing the
+# output of each successfully completed task, and returns the
+# portable_data_hash for the new collection.
+#
+sub create_output_collection
{
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
- '--retries', retry_count());
- my $joboutput;
+ my $pid = open2($child_out, $child_in, 'python', '-c',
+ 'import arvados; ' .
+ 'import sys; ' .
+ 'print arvados.api()' .
+ '.collections()' .
+ '.create(body={"manifest_text":sys.stdin.read()})' .
+ '.execute()["portable_data_hash"]'
+ );
+
for (@jobstep)
{
next if (!exists $_->{'arvados_task'}->{'output'} ||
my $output = $_->{'arvados_task'}->{output};
if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
- $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
print $child_in $output;
}
- elsif (@jobstep == 1)
- {
- $joboutput = $output;
- last;
- }
elsif (defined (my $outblock = fetch_block ($output)))
{
- $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
print $child_in $outblock;
}
else
}
$child_in->close;
- if (!defined $joboutput) {
- my $s = IO::Select->new($child_out);
- if ($s->can_read(120)) {
- sysread($child_out, $joboutput, 64 * 1024 * 1024);
- chomp($joboutput);
- # TODO: Ensure exit status == 0.
- } else {
- Log (undef, "timed out reading from 'arv-put'");
- }
+ my $joboutput;
+ my $s = IO::Select->new($child_out);
+ if ($s->can_read(120)) {
+ sysread($child_out, $joboutput, 64 * 1024 * 1024);
+ chomp($joboutput);
+ # TODO: Ensure exit status == 0.
+ } else {
+ Log (undef, "timed out while creating output collection");
}
# TODO: kill $pid instead of waiting, now that we've decided to
# ignore further output.
my $message = "@_ at $file line $line\n";
Log (undef, $message);
freeze() if @jobstep_todo;
- collate_output() if @jobstep_todo;
+ create_output_collection() if @jobstep_todo;
cleanup();
save_meta();
die;
}
}
freeze();
- collate_output();
+ create_output_collection();
cleanup();
save_meta();
exit 1;
if ($next_try_at < time) {
$retry_msg = "Retrying.";
} else {
- my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
$retry_msg = "Retrying at $next_try_fmt.";
}
Log(undef, "API method $method_name failed: $errmsg. $retry_msg");