Merge branch 'master' into 8465-stderr-redirection
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
index 266b66545f6b4ede822915b96113883ff3ebec0a..9bfee79b59bae21968064b995e5cd87df7d7c7b9 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from . import status
 from .computenode import dispatch
 from .config import actor_class
 
@@ -25,7 +26,6 @@ class _BaseNodeTracker(object):
     def __init__(self):
         self.nodes = {}
         self.orphans = {}
-        self._blacklist = set()
 
     # Proxy the methods listed below to self.nodes.
     def _proxy_method(name):
@@ -44,9 +44,6 @@ class _BaseNodeTracker(object):
     def add(self, record):
         self.nodes[self.record_key(record)] = record
 
-    def blacklist(self, key):
-        self._blacklist.add(key)
-
     def update_record(self, key, item):
         setattr(self.nodes[key], self.RECORD_ATTR, item)
 
@@ -54,9 +51,7 @@ class _BaseNodeTracker(object):
         unseen = set(self.nodes.iterkeys())
         for item in response:
             key = self.item_key(item)
-            if key in self._blacklist:
-                continue
-            elif key in unseen:
+            if key in unseen:
                 unseen.remove(key)
                 self.update_record(key, item)
             else:
@@ -67,10 +62,6 @@ class _BaseNodeTracker(object):
         return (record for record in self.nodes.itervalues()
                 if getattr(record, self.PAIR_ATTR) is None)
 
-    def paired(self):
-        return (record for record in self.nodes.itervalues()
-                if getattr(record, self.PAIR_ATTR) is not None)
-
 
 class _CloudNodeTracker(_BaseNodeTracker):
     RECORD_ATTR = 'cloud_node'
@@ -218,7 +209,14 @@ class NodeManagerDaemonActor(actor_class):
             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
                 self.cloud_nodes.add(record)
             else:
-                record.actor.stop()
+                # Node disappeared from the cloud node list.  Stop the monitor
+                # actor if necessary and forget about the node.
+                if record.actor:
+                    try:
+                        record.actor.stop()
+                    except pykka.ActorDeadError:
+                        pass
+                    record.actor = None
                 record.cloud_node = None
 
     def _register_arvados_node(self, key, arv_node):
@@ -235,7 +233,7 @@ class NodeManagerDaemonActor(actor_class):
     def try_pairing(self):
         for record in self.cloud_nodes.unpaired():
             for arv_rec in self.arvados_nodes.unpaired():
-                if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+                if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
                     self._pair_nodes(record, arv_rec.arvados_node)
                     break
 
@@ -246,14 +244,27 @@ class NodeManagerDaemonActor(actor_class):
         return s
 
     def _node_states(self, size):
-        states = pykka.get_all(rec.actor.get_state()
-                               for rec in self.cloud_nodes.nodes.itervalues()
-                               if ((size is None or rec.cloud_node.size.id == size.id) and
-                                   rec.shutdown_actor is None))
-        states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
-                   if ((size is None or rec.cloud_node.size.id == size.id) and
-                       rec.shutdown_actor is not None)]
-        return states
+        proxy_states = []
+        states = []
+        for rec in self.cloud_nodes.nodes.itervalues():
+            if size is None or rec.cloud_node.size.id == size.id:
+                if rec.shutdown_actor is None and rec.actor is not None:
+                    proxy_states.append(rec.actor.get_state())
+                else:
+                    states.append("shutdown")
+        return states + pykka.get_all(proxy_states)
+
+    def _update_tracker(self):
+        updates = {
+            k: 0
+            for k in status.tracker.keys()
+            if k.startswith('nodes_')
+        }
+        for s in self._node_states(size=None):
+            updates.setdefault('nodes_'+s, 0)
+            updates['nodes_'+s] += 1
+        updates['nodes_wish'] = len(self.last_wishlist)
+        status.tracker.update(updates)
 
     def _state_counts(self, size):
         states = self._node_states(size)
@@ -275,9 +286,9 @@ class NodeManagerDaemonActor(actor_class):
 
     def _total_price(self):
         cost = 0
-        cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price
-                  for c in self.booting.iterkeys())
-        cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
+        cost += sum(self.sizes_booting[c].price
+                    for c in self.booting.iterkeys())
+        cost += sum(c.cloud_node.size.price
                     for c in self.cloud_nodes.nodes.itervalues())
         return cost
 
@@ -338,7 +349,11 @@ class NodeManagerDaemonActor(actor_class):
                 elif (nodes_wanted < 0) and self.booting:
                     self._later.stop_booting_node(size)
             except Exception as e:
-                self._logger.exception("while calculating nodes wanted for size %s", size)
+                self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+        try:
+            self._update_tracker()
+        except:
+            self._logger.exception("while updating tracker")
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
@@ -428,34 +443,51 @@ class NodeManagerDaemonActor(actor_class):
 
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
-        if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
-            self._begin_node_shutdown(node_actor, cancellable=True)
-        elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
-            # Node is unpaired, which means it probably exceeded its booting
-            # grace period without a ping, so shut it down so we can boot a new
-            # node in its place.
-            self._begin_node_shutdown(node_actor, cancellable=False)
-        elif node_actor.in_state('down').get():
-            # Node is down and unlikely to come back.
-            self._begin_node_shutdown(node_actor, cancellable=False)
+        try:
+            if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
+                self._begin_node_shutdown(node_actor, cancellable=True)
+            elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
+                # Node is unpaired, which means it probably exceeded its booting
+                # grace period without a ping, so shut it down so we can boot a new
+                # node in its place.
+                self._begin_node_shutdown(node_actor, cancellable=False)
+            elif node_actor.in_state('down').get():
+                # Node is down and unlikely to come back.
+                self._begin_node_shutdown(node_actor, cancellable=False)
+        except pykka.ActorDeadError as e:
+            # The monitor actor sends shutdown suggestions every time the
+            # node's state is updated, and these go into the daemon actor's
+            # message queue.  It's possible that the node has already been shut
+            # down (which shuts down the node monitor actor).  In that case,
+            # this message is stale and we'll get ActorDeadError when we try to
+            # access node_actor.  Log the error.
+            self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
 
     def node_finished_shutdown(self, shutdown_actor):
-        cloud_node, success, cancel_reason = self._get_actor_attrs(
-            shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
+        try:
+            cloud_node, success = self._get_actor_attrs(
+                shutdown_actor, 'cloud_node', 'success')
+        except pykka.ActorDeadError:
+            return
         cloud_node_id = cloud_node.id
         record = self.cloud_nodes[cloud_node_id]
         shutdown_actor.stop()
+        record.shutdown_actor = None
+
         if not success:
-            if cancel_reason == self._node_shutdown.NODE_BROKEN:
-                self.cloud_nodes.blacklist(cloud_node_id)
-            record.shutdown_actor = None
-        else:
-            # If the node went from being booted to being shut down without ever
-            # appearing in the cloud node list, it will have the
-            # _nodemanager_recently_booted tag, so get rid of it so that the node
-            # can be forgotten completely.
-            if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"):
-                del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted
+            return
+
+        # Shutdown was successful, so stop the monitor actor, otherwise it
+        # will keep offering the node as a candidate for shutdown.
+        record.actor.stop()
+        record.actor = None
+
+        # If the node went from being booted to being shut down without ever
+        # appearing in the cloud node list, it will have the
+        # _nodemanager_recently_booted tag, so get rid of it so that the node
+        # can be forgotten completely.
+        if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
+            del record.cloud_node._nodemanager_recently_booted
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")