8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
authorTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
Conflicts:
sdk/cli/bin/crunch-job

sdk/cli/bin/crunch-job

index b63886e10504a725b8a7374f5861e2a033b2b57c..ca9db1dacdb162d73ee6eb53bb79d5ec2d550014 100755 (executable)
@@ -747,6 +747,7 @@ if ($initial_tasks_this_level < @node) {
   @freeslot = (0..$#slot);
 }
 my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
 
 my %round_max_slots = ();
 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
@@ -1130,122 +1131,133 @@ sub update_progress_stats
 
 sub reapchildren
 {
-  my $pid = waitpid (-1, WNOHANG);
-  return 0 if $pid <= 0;
-
-  my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
-                 . "."
-                 . $slot[$proc{$pid}->{slot}]->{cpu});
-  my $jobstepidx = $proc{$pid}->{jobstepidx};
-  my $elapsed = time - $proc{$pid}->{time};
-  my $Jobstep = $jobstep[$jobstepidx];
-
-  my $childstatus = $?;
-  my $exitvalue = $childstatus >> 8;
-  my $exitinfo = "exit ".exit_status_s($childstatus);
-  $Jobstep->{'arvados_task'}->reload;
-  my $task_success = $Jobstep->{'arvados_task'}->{success};
-
-  Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
-
-  if (!defined $task_success) {
-    # task did not indicate one way or the other --> fail
-    Log($jobstepidx, sprintf(
-          "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
-          exit_status_s($childstatus)));
-    $Jobstep->{'arvados_task'}->{success} = 0;
-    $Jobstep->{'arvados_task'}->save;
-    $task_success = 0;
-  }
-
-  if (!$task_success)
+  my $children_reaped = 0;
+  while ((my $pid = waitpid (-1, WNOHANG)) > 0)
   {
-    my $temporary_fail;
-    $temporary_fail ||= $Jobstep->{tempfail};
-    $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
-    ++$thisround_failed;
-    ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
-    # Check for signs of a failed or misconfigured node
-    if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
-       2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
-      # Don't count this against jobstep failure thresholds if this
-      # node is already suspected faulty and srun exited quickly
-      if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-         $elapsed < 5) {
-       Log ($jobstepidx, "blaming failure on suspect node " .
-             $slot[$proc{$pid}->{slot}]->{node}->{name});
-        $temporary_fail ||= 1;
-      }
-      ban_node_by_slot($proc{$pid}->{slot});
+    my $childstatus = $?;
+
+    my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+                    . "."
+                    . $slot[$proc{$pid}->{slot}]->{cpu});
+    my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+    if (!WIFEXITED($childstatus))
+    {
+      # child did not exit (may be temporarily stopped)
+      Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
+      next;
     }
 
-    Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
-                             ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary' : 'permanent',
-                             $elapsed));
+    $children_reaped++;
+    my $elapsed = time - $proc{$pid}->{time};
+    my $Jobstep = $jobstep[$jobstepidx];
 
-    if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
-      # Give up on this task, and the whole job
-      $main::success = 0;
+    my $exitvalue = $childstatus >> 8;
+    my $exitinfo = "exit ".exit_status_s($childstatus);
+    $Jobstep->{'arvados_task'}->reload;
+    my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+    Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+    if (!defined $task_success) {
+      # task did not indicate one way or the other --> fail
+      Log($jobstepidx, sprintf(
+            "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+            exit_status_s($childstatus)));
+      $Jobstep->{'arvados_task'}->{success} = 0;
+      $Jobstep->{'arvados_task'}->save;
+      $task_success = 0;
     }
-    # Put this task back on the todo queue
-    push @jobstep_todo, $jobstepidx;
-    $Job->{'tasks_summary'}->{'failed'}++;
-  }
-  else
-  {
-    ++$thisround_succeeded;
-    $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
-    $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
-    $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-    push @jobstep_done, $jobstepidx;
-    Log ($jobstepidx, "success in $elapsed seconds");
-  }
-  $Jobstep->{exitcode} = $childstatus;
-  $Jobstep->{finishtime} = time;
-  $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-  $Jobstep->{'arvados_task'}->save;
-  process_stderr_final ($jobstepidx);
-  Log ($jobstepidx, sprintf("task output (%d bytes): %s",
-                           length($Jobstep->{'arvados_task'}->{output}),
-                           $Jobstep->{'arvados_task'}->{output}));
 
-  close $reader{$jobstepidx};
-  delete $reader{$jobstepidx};
-  delete $slot[$proc{$pid}->{slot}]->{pid};
-  push @freeslot, $proc{$pid}->{slot};
-  delete $proc{$pid};
-
-  if ($task_success) {
-    # Load new tasks
-    my $newtask_list = [];
-    my $newtask_results;
-    do {
-      $newtask_results = api_call(
-        "job_tasks/list",
-        'where' => {
-          'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-        },
-        'order' => 'qsequence',
-        'offset' => scalar(@$newtask_list),
-      );
-      push(@$newtask_list, @{$newtask_results->{items}});
-    } while (@{$newtask_results->{items}});
-    foreach my $arvados_task (@$newtask_list) {
-      my $jobstep = {
-        'level' => $arvados_task->{'sequence'},
-        'failures' => 0,
-        'arvados_task' => $arvados_task
-      };
-      push @jobstep, $jobstep;
-      push @jobstep_todo, $#jobstep;
+    if (!$task_success)
+    {
+      my $temporary_fail;
+      $temporary_fail ||= $Jobstep->{tempfail};
+      $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+      ++$thisround_failed;
+      ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+      # Check for signs of a failed or misconfigured node
+      if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+          2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+        # Don't count this against jobstep failure thresholds if this
+        # node is already suspected faulty and srun exited quickly
+        if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+            $elapsed < 5) {
+          Log ($jobstepidx, "blaming failure on suspect node " .
+               $slot[$proc{$pid}->{slot}]->{node}->{name});
+          $temporary_fail ||= 1;
+        }
+        ban_node_by_slot($proc{$pid}->{slot});
+      }
+
+      Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+                                ++$Jobstep->{'failures'},
+                                $temporary_fail ? 'temporary' : 'permanent',
+                                $elapsed));
+
+      if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+        # Give up on this task, and the whole job
+        $main::success = 0;
+      }
+      # Put this task back on the todo queue
+      push @jobstep_todo, $jobstepidx;
+      $Job->{'tasks_summary'}->{'failed'}++;
     }
+    else
+    {
+      ++$thisround_succeeded;
+      $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+      $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+      $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+      push @jobstep_done, $jobstepidx;
+      Log ($jobstepidx, "success in $elapsed seconds");
+    }
+    $Jobstep->{exitcode} = $childstatus;
+    $Jobstep->{finishtime} = time;
+    $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+    $Jobstep->{'arvados_task'}->save;
+    process_stderr_final ($jobstepidx);
+    Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+                              length($Jobstep->{'arvados_task'}->{output}),
+                              $Jobstep->{'arvados_task'}->{output}));
+
+    close $reader{$jobstepidx};
+    delete $reader{$jobstepidx};
+    delete $slot[$proc{$pid}->{slot}]->{pid};
+    push @freeslot, $proc{$pid}->{slot};
+    delete $proc{$pid};
+
+    if ($task_success) {
+      # Load new tasks
+      my $newtask_list = [];
+      my $newtask_results;
+      do {
+        $newtask_results = api_call(
+          "job_tasks/list",
+          'where' => {
+            'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+          },
+          'order' => 'qsequence',
+          'offset' => scalar(@$newtask_list),
+            );
+        push(@$newtask_list, @{$newtask_results->{items}});
+      } while (@{$newtask_results->{items}});
+      foreach my $arvados_task (@$newtask_list) {
+        my $jobstep = {
+          'level' => $arvados_task->{'sequence'},
+          'failures' => 0,
+          'arvados_task' => $arvados_task
+        };
+        push @jobstep, $jobstep;
+        push @jobstep_todo, $#jobstep;
+      }
+    }
+    $progress_is_dirty = 1;
   }
 
-  $progress_is_dirty = 1;
-  1;
+  return $children_reaped;
 }
 
 sub check_refresh_wanted
@@ -1292,10 +1304,13 @@ sub check_squeue
   # squeue check interval (15s) this should make the squeue check an
   # infrequent event.
   my $silent_procs = 0;
-  for my $procinfo (values %proc)
+  for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
   {
-    my $jobstep = $jobstep[$procinfo->{jobstepidx}];
-    if ($jobstep->{stderr_at} < $last_squeue_check)
+    if (!exists($js->{stderr_at}))
+    {
+      $js->{stderr_at} = 0;
+    }
+    if ($js->{stderr_at} < $last_squeue_check)
     {
       $silent_procs++;
     }
@@ -1305,14 +1320,14 @@ sub check_squeue
   # use killem() on procs whose killtime is reached
   while (my ($pid, $procinfo) = each %proc)
   {
-    my $jobstep = $jobstep[$procinfo->{jobstepidx}];
+    my $js = $jobstep[$procinfo->{jobstepidx}];
     if (exists $procinfo->{killtime}
         && $procinfo->{killtime} <= time
-        && $jobstep->{stderr_at} < $last_squeue_check)
+        && $js->{stderr_at} < $last_squeue_check)
     {
       my $sincewhen = "";
-      if ($jobstep->{stderr_at}) {
-        $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+      if ($js->{stderr_at}) {
+        $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
       }
       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
       killem ($pid);
@@ -1383,27 +1398,37 @@ sub release_allocation
 sub readfrompipes
 {
   my $gotsome = 0;
+  my %fd_job;
+  my $sel = IO::Select->new();
   foreach my $jobstepidx (keys %reader)
+  {
+    my $fd = $reader{$jobstepidx};
+    $sel->add($fd);
+    $fd_job{$fd} = $jobstepidx;
+
+    if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+      $sel->add($stdout_fd);
+      $fd_job{$stdout_fd} = $jobstepidx;
+    }
+  }
+  # select on all reader fds with 0.1s timeout
+  my @ready_fds = $sel->can_read(0.1);
+  foreach my $fd (@ready_fds)
   {
     my $buf;
-    if ($jobstep[$jobstepidx]->{stdout_r} &&
-        0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
+    if (0 < sysread ($fd, $buf, 65536))
     {
+      $gotsome = 1;
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
+
+      my $jobstepidx = $fd_job{$fd};
+      if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+        next;
       }
-      $gotsome = 1;
-    }
-    if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
-    {
-      print STDERR $buf if $ENV{CRUNCH_DEBUG};
+
       $jobstep[$jobstepidx]->{stderr_at} = time;
       $jobstep[$jobstepidx]->{stderr} .= $buf;
-      if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
-        $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
-      }
-      $gotsome = 1;
 
       # Consume everything up to the last \n
       preprocess_stderr ($jobstepidx);