4410: crunch-job fixups from code review.
[arvados.git] / sdk / cli / bin / crunch-job
index d8ae6e646198868d47bda010d018a6884bc75cab..2246c86fb62907eb61a279cbac9d2d97c242aa1b 100755 (executable)
@@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree );
 
 use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -721,6 +722,7 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
@@ -986,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
        ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -1009,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -1048,18 +1050,18 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_output_collection();
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -1067,16 +1069,7 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &create_output_collection();
-
-if (!$collated_output) {
-  Log (undef, "Failed to write output collection");
-}
-else {
-  Log(undef, "job output $collated_output");
-  $Job->update_attributes('output' => $collated_output);
-}
-
+my $collated_output = save_output_collection();
 Log (undef, "finish");
 
 save_meta();
@@ -1511,6 +1504,20 @@ print (arvados.api("v1").collections().
   return $joboutput;
 }
 
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+  my $collated_output = create_output_collection();
+
+  if (!$collated_output) {
+    Log(undef, "Failed to write output collection");
+  }
+  else {
+    Log(undef, "job output $collated_output");
+    $Job->update_attributes('output' => $collated_output);
+  }
+  return $collated_output;
+}
 
 sub killem
 {
@@ -1556,6 +1563,8 @@ sub fhbits
 # Send log output to Keep via arv-put.
 #
 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
 # $log_pipe_pid is the pid of the arv-put subprocess.
 #
 # The only functions that should access these variables directly are:
@@ -1564,6 +1573,13 @@ sub fhbits
 #     Starts an arv-put pipe, reading data on stdin and writing it to
 #     a $logfilename file in an output collection.
 #
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
 # log_writer_send($txt)
 #     Writes $txt to the output log collection.
 #
@@ -1574,25 +1590,40 @@ sub fhbits
 #     Returns a true value if there is currently a live arv-put
 #     process, false otherwise.
 #
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
 
 sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
-                        '--portable-data-hash',
-                        '--project-uuid', $Job->{owner_uuid},
+                        '--stream',
                         '--retries', '3',
-                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
 }
 
 sub log_writer_send($)
 {
   my $txt = shift;
   print $log_pipe_in $txt;
+  log_writer_read_output();
 }
 
 sub log_writer_finish()
@@ -1600,22 +1631,24 @@ sub log_writer_finish()
   return unless $log_pipe_pid;
 
   close($log_pipe_in);
-  my $arv_put_output;
 
-  my $s = IO::Select->new($log_pipe_out);
-  if ($s->can_read(120)) {
-    sysread($log_pipe_out, $arv_put_output, 1024);
-    chomp($arv_put_output);
-  } else {
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
     Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
   if ($?) {
-    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
   return $arv_put_output;
 }
 
@@ -1679,10 +1712,21 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
 
-  my $loglocator = log_writer_finish();
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
+
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
 }