Merge branch '4621-crunch-memory-usage'
authorTim Pierce <twp@curoverse.com>
Tue, 2 Dec 2014 21:17:26 +0000 (16:17 -0500)
committerTim Pierce <twp@curoverse.com>
Tue, 2 Dec 2014 21:17:26 +0000 (16:17 -0500)
Refs #4621.

sdk/cli/bin/crunch-job

index 0d35d53f9d2b924ea8b583fda5b5a3a682be09fb..7aa02ac6994dc1444a11c6dc6eaf732361021932 100755 (executable)
@@ -305,7 +305,6 @@ my @jobstep_tomerge = ();
 my $jobstep_tomerge_level = 0;
 my $squeue_checked;
 my $squeue_kill_checked;
-my $output_in_keep = 0;
 my $latest_refresh = scalar time;
 
 
@@ -823,7 +822,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       $main::please_info = 0;
       freeze();
-      collate_output();
+      create_output_collection();
       save_meta(1);
       update_progress_stats();
     }
@@ -885,7 +884,7 @@ while (%proc)
     $main::please_continue = 0;
     goto THISROUND;
   }
-  $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+  $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
   readfrompipes ();
   if (!reapchildren())
   {
@@ -922,28 +921,14 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &collate_output();
+my $collated_output = &create_output_collection();
 
 if (!$collated_output) {
-  Log(undef, "output undef");
+  Log (undef, "Failed to write output collection");
 }
 else {
-  eval {
-    open(my $orig_manifest, '-|', 'arv-get', $collated_output)
-        or die "failed to get collated manifest: $!";
-    my $orig_manifest_text = '';
-    while (my $manifest_line = <$orig_manifest>) {
-      $orig_manifest_text .= $manifest_line;
-    }
-    my $output = api_call("collections/create", collection => {
-      'manifest_text' => $orig_manifest_text});
-    Log(undef, "output uuid " . $output->{uuid});
-    Log(undef, "output hash " . $output->{portable_data_hash});
-    $Job->update_attributes('output' => $output->{portable_data_hash});
-  };
-  if ($@) {
-    Log (undef, "Failed to register output manifest: $@");
-  }
+  Log(undef, "output hash " . $collated_output);
+  $Job->update_attributes('output' => $collated_output);
 }
 
 Log (undef, "finish");
@@ -1275,14 +1260,24 @@ sub fetch_block
   return $output_block;
 }
 
-sub collate_output
+# create_output_collections generates a new collection containing the
+# output of each successfully completed task, and returns the
+# portable_data_hash for the new collection.
+#
+sub create_output_collection
 {
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
-                  '--retries', retry_count());
-  my $joboutput;
+  my $pid = open2($child_out, $child_in, 'python', '-c',
+                  'import arvados; ' .
+                  'import sys; ' .
+                  'print arvados.api()' .
+                  '.collections()' .
+                  '.create(body={"manifest_text":sys.stdin.read()})' .
+                  '.execute()["portable_data_hash"]'
+      );
+
   for (@jobstep)
   {
     next if (!exists $_->{'arvados_task'}->{'output'} ||
@@ -1290,17 +1285,10 @@ sub collate_output
     my $output = $_->{'arvados_task'}->{output};
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
-      $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
       print $child_in $output;
     }
-    elsif (@jobstep == 1)
-    {
-      $joboutput = $output;
-      last;
-    }
     elsif (defined (my $outblock = fetch_block ($output)))
     {
-      $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
       print $child_in $outblock;
     }
     else
@@ -1311,15 +1299,14 @@ sub collate_output
   }
   $child_in->close;
 
-  if (!defined $joboutput) {
-    my $s = IO::Select->new($child_out);
-    if ($s->can_read(120)) {
-      sysread($child_out, $joboutput, 64 * 1024 * 1024);
-      chomp($joboutput);
-      # TODO: Ensure exit status == 0.
-    } else {
-      Log (undef, "timed out reading from 'arv-put'");
-    }
+  my $joboutput;
+  my $s = IO::Select->new($child_out);
+  if ($s->can_read(120)) {
+    sysread($child_out, $joboutput, 64 * 1024 * 1024);
+    chomp($joboutput);
+    # TODO: Ensure exit status == 0.
+  } else {
+    Log (undef, "timed out while creating output collection");
   }
   # TODO: kill $pid instead of waiting, now that we've decided to
   # ignore further output.
@@ -1469,7 +1456,7 @@ sub croak
   my $message = "@_ at $file line $line\n";
   Log (undef, $message);
   freeze() if @jobstep_todo;
-  collate_output() if @jobstep_todo;
+  create_output_collection() if @jobstep_todo;
   cleanup();
   save_meta();
   die;
@@ -1521,7 +1508,7 @@ sub freeze_if_want_freeze
       }
     }
     freeze();
-    collate_output();
+    create_output_collection();
     cleanup();
     save_meta();
     exit 1;
@@ -1702,7 +1689,7 @@ sub api_call {
     if ($next_try_at < time) {
       $retry_msg = "Retrying.";
     } else {
-      my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
+      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
       $retry_msg = "Retrying at $next_try_fmt.";
     }
     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");