16625: implement some more review feedback.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
index 73b58bfe65fc0cc871579a22500424d2ec2f87fc..1edf4dc4792e5b7a9f638a17599c66c76410ab84 100644 (file)
@@ -112,7 +112,8 @@ class NodeManagerDaemonActor(actor_class):
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
                  node_actor_class=dispatch.ComputeNodeMonitorActor,
-                 max_total_price=0):
+                 max_total_price=0,
+                 consecutive_idle_count=1):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
@@ -133,6 +134,7 @@ class NodeManagerDaemonActor(actor_class):
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
+        self.consecutive_idle_count = consecutive_idle_count
         self.last_polls = {}
         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
             poll_actor = locals()[poll_name + '_actor']
@@ -173,7 +175,8 @@ class NodeManagerDaemonActor(actor_class):
             poll_stale_after=self.poll_stale_after,
             node_stale_after=self.node_stale_after,
             cloud_client=self._cloud_driver,
-            boot_fail_after=self.boot_fail_after)
+            boot_fail_after=self.boot_fail_after,
+            consecutive_idle_count=self.consecutive_idle_count)
         actorTell = actor.tell_proxy()
         actorTell.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
@@ -215,8 +218,11 @@ class NodeManagerDaemonActor(actor_class):
             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
                 self.cloud_nodes.add(record)
             else:
-                # Node disappeared from the cloud node list.  Stop the monitor
-                # actor if necessary and forget about the node.
+                # Node disappeared from the cloud node list. If it's paired,
+                # remove its idle time counter.
+                if record.arvados_node:
+                    status.tracker.idle_out(record.arvados_node.get('hostname'))
+                # Stop the monitor actor if necessary and forget about the node.
                 if record.actor:
                     try:
                         record.actor.stop()
@@ -270,6 +276,7 @@ class NodeManagerDaemonActor(actor_class):
             updates.setdefault('nodes_'+s, 0)
             updates['nodes_'+s] += 1
         updates['nodes_wish'] = len(self.last_wishlist)
+        updates['node_quota'] = self.node_quota
         status.tracker.update(updates)
 
     def _state_counts(self, size):
@@ -314,7 +321,7 @@ class NodeManagerDaemonActor(actor_class):
         busy_count = counts["busy"]
         wishlist_count = self._size_wishlist(size)
 
-        self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
+        self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
                           wishlist_count,
                           up_count,
                           counts["booting"],
@@ -334,7 +341,7 @@ class NodeManagerDaemonActor(actor_class):
             can_boot = int((self.max_total_price - total_price) / size.price)
             if can_boot == 0:
                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
-                                  size.name, size.price, self.max_total_price, total_price)
+                                  size.id, size.price, self.max_total_price, total_price)
             return can_boot
         else:
             return wanted
@@ -348,7 +355,8 @@ class NodeManagerDaemonActor(actor_class):
 
     def update_server_wishlist(self, wishlist):
         self._update_poll_time('server_wishlist')
-        self.last_wishlist = wishlist
+        requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
+        self.last_wishlist = wishlist[:requestable_nodes]
         for size in reversed(self.server_calculator.cloud_sizes):
             try:
                 nodes_wanted = self._nodes_wanted(size)
@@ -356,7 +364,7 @@ class NodeManagerDaemonActor(actor_class):
                     self._later.start_node(size)
                 elif (nodes_wanted < 0) and self.booting:
                     self._later.stop_booting_node(size)
-            except Exception as e:
+            except Exception:
                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
         try:
             self._update_tracker()
@@ -385,22 +393,25 @@ class NodeManagerDaemonActor(actor_class):
         nodes_wanted = self._nodes_wanted(cloud_size)
         if nodes_wanted < 1:
             return None
-        arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        self._logger.info("Want %i more %s nodes.  Booting a node.",
-                          nodes_wanted, cloud_size.name)
-        new_setup = self._node_setup.start(
-            timer_actor=self._timer,
-            arvados_client=self._new_arvados(),
-            arvados_node=arvados_node,
-            cloud_client=self._new_cloud(),
-            cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
-        self.booting[new_setup.actor_ref.actor_urn] = new_setup
-        self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
-
-        if arvados_node is not None:
-            self.arvados_nodes[arvados_node['uuid']].assignment_time = (
-                time.time())
-        new_setup.subscribe(self._later.node_setup_finished)
+
+        if not self.cancel_node_shutdown(cloud_size):
+            arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+            self._logger.info("Want %i more %s nodes.  Booting a node.",
+                              nodes_wanted, cloud_size.id)
+            new_setup = self._node_setup.start(
+                timer_actor=self._timer,
+                arvados_client=self._new_arvados(),
+                arvados_node=arvados_node,
+                cloud_client=self._new_cloud(),
+                cloud_size=self.server_calculator.find_size(cloud_size.id))
+            self.booting[new_setup.actor_urn] = new_setup.proxy()
+            self.sizes_booting[new_setup.actor_urn] = cloud_size
+
+            if arvados_node is not None:
+                self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+                    time.time())
+            new_setup.tell_proxy().subscribe(self._later.node_setup_finished)
+
         if nodes_wanted > 1:
             self._later.start_node(cloud_size)
 
@@ -451,13 +462,28 @@ class NodeManagerDaemonActor(actor_class):
         if (nodes_excess < 1) or not self.booting:
             return None
         for key, node in self.booting.iteritems():
-            if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
-                del self.booting[key]
-                del self.sizes_booting[key]
+            try:
+                if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2):
+                    del self.booting[key]
+                    del self.sizes_booting[key]
+                    if nodes_excess > 1:
+                        self._later.stop_booting_node(size)
+                    return
+            except pykka.Timeout:
+                pass
 
-                if nodes_excess > 1:
-                    self._later.stop_booting_node(size)
-                break
+    @_check_poll_freshness
+    def cancel_node_shutdown(self, size):
+        # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled
+        for record in self.cloud_nodes.nodes.itervalues():
+            try:
+                if (record.shutdown_actor is not None and
+                    record.cloud_node.size.id == size.id and
+                    record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)):
+                        return True
+            except (pykka.ActorDeadError, pykka.Timeout) as e:
+                pass
+        return False
 
     def _begin_node_shutdown(self, node_actor, cancellable):
         cloud_node_obj = node_actor.cloud_node.get()