Merge branch '4499-one-task-per-input-file-normalize'
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
index 08ee12e1ade5947c4cb3d8759c36ae32692330af..239934f52911782730840d3d1aa5042866345451 100644 (file)
@@ -2,6 +2,8 @@
 
 from __future__ import absolute_import, print_function
 
+import logging
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
@@ -36,11 +38,14 @@ class ServerCalculator(object):
             return True
 
 
-    def __init__(self, server_list, max_nodes=None):
+    def __init__(self, server_list, min_nodes=0, max_nodes=None):
         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
                             for s, kws in server_list]
         self.cloud_sizes.sort(key=lambda s: s.price)
-        self.max_nodes = max_nodes or float("inf")
+        self.min_nodes = min_nodes
+        self.max_nodes = max_nodes or float('inf')
+        self.logger = logging.getLogger('arvnodeman.jobqueue')
+        self.logged_jobs = set()
 
     @staticmethod
     def coerce_int(x, fallback):
@@ -61,12 +66,26 @@ class ServerCalculator(object):
 
     def servers_for_queue(self, queue):
         servers = []
+        seen_jobs = set()
         for job in queue:
+            seen_jobs.add(job['uuid'])
             constraints = job['runtime_constraints']
             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
             cloud_size = self.cloud_size_for_constraints(constraints)
-            if (want_count < self.max_nodes) and (cloud_size is not None):
+            if cloud_size is None:
+                if job['uuid'] not in self.logged_jobs:
+                    self.logged_jobs.add(job['uuid'])
+                    self.logger.debug("job %s not satisfiable", job['uuid'])
+            elif (want_count <= self.max_nodes):
                 servers.extend([cloud_size.real] * max(1, want_count))
+        self.logged_jobs.intersection_update(seen_jobs)
+
+        # Make sure the server queue has at least enough entries to
+        # satisfy min_nodes.
+        node_shortfall = self.min_nodes - len(servers)
+        if node_shortfall > 0:
+            basic_node = self.cloud_size_for_constraints({})
+            servers.extend([basic_node.real] * node_shortfall)
         return servers
 
 
@@ -92,5 +111,5 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     def _got_response(self, queue):
         server_list = self._calculator.servers_for_queue(queue)
         self._logger.debug("Sending server wishlist: %s",
-                           ', '.join(s.name for s in server_list))
+                           ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)