4410: Crunch retries jobs when all SLURM nodes fail.
authorBrett Smith <brett@curoverse.com>
Mon, 15 Jun 2015 17:54:36 +0000 (13:54 -0400)
committerBrett Smith <brett@curoverse.com>
Mon, 22 Jun 2015 20:49:52 +0000 (16:49 -0400)
See the ticket for detailed background discussion and implementation
rationale, especially notes 13 and 14.

This required a couple of ancillary changes:

* crunch-job now makes a distinction between "task failed because a
  node failed," and "task failed for other temporary reason."  It uses
  this additional information to decide when it should retry tasks
  itself, and when it needs to give up and kick the problem up to
  crunch-dispatch.

* crunch-job now handles creating log collections itself from
  manifests generated by arv-put.  This enables it to append to logs
  generated during previous attempts to run the job.

sdk/cli/bin/crunch-job
services/api/script/crunch-dispatch.rb

index d8ae6e646198868d47bda010d018a6884bc75cab..b38efdc53eca6ea6f3e3dd5ab9604ed07b46efea 100755 (executable)
@@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree );
 
 use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -292,9 +293,16 @@ foreach (@sinfo)
   {
     Log (undef, "node $nodename - $ncpus slots");
     my $node = { name => $nodename,
-                ncpus => $ncpus,
-                losing_streak => 0,
-                hold_until => 0 };
+                 ncpus => $ncpus,
+                 # The number of consecutive times a task has been dispatched
+                 # to this node and failed.
+                 losing_streak => 0,
+                 # The number of consecutive times that SLURM has reported
+                 # a node failure since the last successful task.
+                 fail_count => 0,
+                 # Don't dispatch work to this node until this time
+                 # (in seconds since the epoch) has passed.
+                 hold_until => 0 };
     foreach my $cpu (1..$ncpus)
     {
       push @slot, { node => $node,
@@ -721,6 +729,7 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
@@ -950,6 +959,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   $Jobstep->{slotindex} = $childslot;
   delete $Jobstep->{stderr};
   delete $Jobstep->{finishtime};
+  delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
   $Jobstep->{'arvados_task'}->save;
@@ -986,6 +996,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
        ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -1009,10 +1021,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -1048,18 +1058,18 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_output_collection();
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -1067,16 +1077,7 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &create_output_collection();
-
-if (!$collated_output) {
-  Log (undef, "Failed to write output collection");
-}
-else {
-  Log(undef, "job output $collated_output");
-  $Job->update_attributes('output' => $collated_output);
-}
-
+my $collated_output = save_output_collection();
 Log (undef, "finish");
 
 save_meta();
@@ -1141,7 +1142,7 @@ sub reapchildren
   if (!$task_success)
   {
     my $temporary_fail;
-    $temporary_fail ||= $Jobstep->{node_fail};
+    $temporary_fail ||= $Jobstep->{tempfail};
     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
@@ -1179,6 +1180,7 @@ sub reapchildren
     ++$thisround_succeeded;
     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+    $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
@@ -1389,10 +1391,19 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
-      $jobstep[$job]->{node_fail} = 1;
+    elsif ($line =~ /srun: error: Node failure on/) {
+      my $job_slot_index = $jobstep[$job]->{slotindex};
+      $slot[$job_slot_index]->{node}->{fail_count}++;
+      $jobstep[$job]->{tempfail} = 1;
+      ban_node_by_slot($job_slot_index);
+    }
+    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+      $jobstep[$job]->{tempfail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
+    elsif ($line =~ /arvados\.errors\.Keep/) {
+      $jobstep[$job]->{tempfail} = 1;
+    }
   }
 }
 
@@ -1511,6 +1522,20 @@ print (arvados.api("v1").collections().
   return $joboutput;
 }
 
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+  my $collated_output = create_output_collection();
+
+  if (!$collated_output) {
+    Log(undef, "Failed to write output collection");
+  }
+  else {
+    Log(undef, "job output $collated_output");
+    $Job->update_attributes('output' => $collated_output);
+  }
+  return $collated_output;
+}
 
 sub killem
 {
@@ -1556,6 +1581,8 @@ sub fhbits
 # Send log output to Keep via arv-put.
 #
 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
 # $log_pipe_pid is the pid of the arv-put subprocess.
 #
 # The only functions that should access these variables directly are:
@@ -1564,6 +1591,13 @@ sub fhbits
 #     Starts an arv-put pipe, reading data on stdin and writing it to
 #     a $logfilename file in an output collection.
 #
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
 # log_writer_send($txt)
 #     Writes $txt to the output log collection.
 #
@@ -1574,25 +1608,40 @@ sub fhbits
 #     Returns a true value if there is currently a live arv-put
 #     process, false otherwise.
 #
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
 
 sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
-                        '--portable-data-hash',
-                        '--project-uuid', $Job->{owner_uuid},
+                        '--stream',
                         '--retries', '3',
-                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
 }
 
 sub log_writer_send($)
 {
   my $txt = shift;
   print $log_pipe_in $txt;
+  log_writer_read_output();
 }
 
 sub log_writer_finish()
@@ -1600,22 +1649,24 @@ sub log_writer_finish()
   return unless $log_pipe_pid;
 
   close($log_pipe_in);
-  my $arv_put_output;
 
-  my $s = IO::Select->new($log_pipe_out);
-  if ($s->can_read(120)) {
-    sysread($log_pipe_out, $arv_put_output, 1024);
-    chomp($arv_put_output);
-  } else {
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
     Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
   if ($?) {
-    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
   return $arv_put_output;
 }
 
@@ -1679,10 +1730,21 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
 
-  my $loglocator = log_writer_finish();
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
+
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
 }
 
 
index 3c1e4c60cd4bba2185f91c9025d61fddeb31dbf0..515bfaada00cf768c602bba154c9045d33cca3f5 100755 (executable)
@@ -54,6 +54,8 @@ class Dispatcher
   include ApplicationHelper
 
   EXIT_TEMPFAIL = 75
+  EXIT_RETRY_UNLOCKED = 93
+  RETRY_UNLOCKED_LIMIT = 3
 
   def initialize
     @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
@@ -77,6 +79,8 @@ class Dispatcher
     @pipe_auth_tokens = {}
     @running = {}
     @todo = []
+    @todo_job_retries = {}
+    @job_retry_counts = Hash.new(0)
     @todo_pipelines = []
   end
 
@@ -86,7 +90,7 @@ class Dispatcher
 
   def refresh_todo
     if $options[:jobs]
-      @todo = Job.queue.select(&:repository)
+      @todo = @todo_job_retries.values + Job.queue.select(&:repository)
     end
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
@@ -417,6 +421,10 @@ class Dispatcher
                    '--job', job.uuid,
                    '--git-dir', @arvados_internal]
 
+      if @todo_job_retries.include?(job.uuid)
+        cmd_args << "--force-unlock"
+      end
+
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
       begin
@@ -452,6 +460,7 @@ class Dispatcher
         log_throttle_bytes_skipped: 0,
       }
       i.close
+      @todo_job_retries.delete(job.uuid)
       update_node_status
     end
   end
@@ -650,26 +659,49 @@ class Dispatcher
 
     # Wait the thread (returns a Process::Status)
     exit_status = j_done[:wait_thr].value.exitstatus
+    exit_tempfail = exit_status == EXIT_TEMPFAIL
 
     $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
     $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if exit_status != EXIT_TEMPFAIL and jobrecord.state == "Running"
-      # crunch-job did not return exit code 75 (see below) and left the job in
-      # the "Running" state, which means there was an unhandled error.  Fail
-      # the job.
-      jobrecord.state = "Failed"
-      if not jobrecord.save
-        $stderr.puts "dispatch: jobrecord.save failed"
+
+    if exit_status == EXIT_RETRY_UNLOCKED
+      # The job failed because all of the nodes allocated to it
+      # failed.  Only this crunch-dispatch process can retry the job:
+      # it's already locked, and there's no way to put it back in the
+      # Queued state.  Put it in our internal todo list unless the job
+      # has failed this way excessively.
+      @job_retry_counts[jobrecord.uuid] += 1
+      exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+      if exit_tempfail
+        @todo_job_retries[jobrecord.uuid] = jobrecord
+      else
+        $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+      end
+    end
+
+    if !exit_tempfail
+      @job_retry_counts.delete(jobrecord.uuid)
+      if jobrecord.state == "Running"
+        # Apparently there was an unhandled error.  That could potentially
+        # include "all allocated nodes failed" when we don't to retry
+        # because the job has already been retried RETRY_UNLOCKED_LIMIT
+        # times.  Fail the job.
+        jobrecord.state = "Failed"
+        if not jobrecord.save
+          $stderr.puts "dispatch: jobrecord.save failed"
+        end
       end
     else
-      # Don't fail the job if crunch-job didn't even get as far as
-      # starting it. If the job failed to run due to an infrastructure
+      # If the job failed to run due to an infrastructure
       # issue with crunch-job or slurm, we want the job to stay in the
       # queue. If crunch-job exited after losing a race to another
       # crunch-job process, it exits 75 and we should leave the job
-      # record alone so the winner of the race do its thing.
+      # record alone so the winner of the race can do its thing.
+      # If crunch-job exited after all of its allocated nodes failed,
+      # it exits 93, and we want to retry it later (see the
+      # EXIT_RETRY_UNLOCKED `if` block).
       #
       # There is still an unhandled race condition: If our crunch-job
       # process is about to lose a race with another crunch-job
@@ -683,7 +715,7 @@ class Dispatcher
 
     # Invalidate the per-job auth token, unless the job is still queued and we
     # might want to try it again.
-    if jobrecord.state != "Queued"
+    if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
       j_done[:job_auth].update_attributes expires_at: Time.now
     end
 
@@ -737,6 +769,14 @@ class Dispatcher
       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
              [], [], 1)
     end
+    # If there are jobs we wanted to retry, we have to mark them as failed now.
+    # Other dispatchers can't pick them up because we hold their lock.
+    @todo_job_retries.each_key do |job_uuid|
+      job = Job.find_by_uuid(job_uuid)
+      if job.state == "Running"
+        fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
+      end
+    end
   end
 
   protected