Merge branch '3198-inode-cache' refs #3198
[arvados.git] / sdk / cli / bin / crunch-job
index a6ccebcd91dd4d5f46221a575d1a1d4e0cb4da6e..6ae04812325994678dad9e29d736b5556731f4f4 100755 (executable)
@@ -643,12 +643,44 @@ my $thisround_failed_multiple = 0;
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
                       or $a <=> $b } @jobstep_todo;
 my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
 
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+  $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
 
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers.  Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level.  This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+  @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+  @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+  @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
 
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+  my $this_slot = $slot[$freeslot[$ii]];
+  my $node_name = $this_slot->{node}->{name};
+  $round_max_slots{$node_name} ||= $this_slot->{cpu};
+  last if (scalar(keys(%round_max_slots)) >= @node);
+}
+
+Log(undef, "start level $level with $round_num_freeslots slots");
 my %proc;
-my @freeslot = (0..$#slot);
 my @holdslot;
 my %reader;
 my $progress_is_dirty = 1;
@@ -657,12 +689,7 @@ my $progress_stats_updated = 0;
 update_progress_stats();
 
 
-
 THISROUND:
-my $tasks_this_level = 0;
-foreach my $id (@jobstep_todo) {
-  $tasks_this_level++ if ($jobstep[$id]->{level} == $level);
-}
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
   my $id = $jobstep_todo[$todo_ptr];
@@ -715,16 +742,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $ENV{"HOME"} = $ENV{"TASK_WORK"};
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-    $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+    $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
     $ENV{"GZIP"} = "-n";
 
-    my $max_node_concurrent_tasks = $ENV{CRUNCH_NODE_SLOTS};
-    if ($tasks_this_level < $max_node_concurrent_tasks) {
-      $max_node_concurrent_tasks = $tasks_this_level;
-    }
-
     my @srunargs = (
       "srun",
       "--nodelist=".$childnode->{name},
@@ -739,7 +761,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         # $command.  No tool is expected to read these values directly.
         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
-        ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($max_node_concurrent_tasks * 100) )) "
+        ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
@@ -859,7 +881,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
   while (!@freeslot
         ||
-        (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+        ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
     last THISROUND if $main::please_freeze || defined($main::success);
     if ($main::please_info)