7661: added test with only_pdh (not working yet)
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
index 30592abe87c37d5f87f5732fd703c1ca82e027d3..a65e9a0705d1cd5941140a20dbd4c6d4e1f5fd57 100644 (file)
@@ -25,6 +25,7 @@ class _BaseNodeTracker(object):
     def __init__(self):
         self.nodes = {}
         self.orphans = {}
+        self._blacklist = set()
 
     # Proxy the methods listed below to self.nodes.
     def _proxy_method(name):
@@ -43,6 +44,9 @@ class _BaseNodeTracker(object):
     def add(self, record):
         self.nodes[self.record_key(record)] = record
 
+    def blacklist(self, key):
+        self._blacklist.add(key)
+
     def update_record(self, key, item):
         setattr(self.nodes[key], self.RECORD_ATTR, item)
 
@@ -50,7 +54,9 @@ class _BaseNodeTracker(object):
         unseen = set(self.nodes.iterkeys())
         for item in response:
             key = self.item_key(item)
-            if key in unseen:
+            if key in self._blacklist:
+                continue
+            elif key in unseen:
                 unseen.remove(key)
                 self.update_record(key, item)
             else:
@@ -182,9 +188,14 @@ class NodeManagerDaemonActor(actor_class):
                     self._pair_nodes(record, arv_rec.arvados_node)
                     break
         for key, record in self.cloud_nodes.orphans.iteritems():
+            if key in self.shutdowns:
+                try:
+                    self.shutdowns[key].stop().get()
+                except pykka.ActorDeadError:
+                    pass
+                del self.shutdowns[key]
             record.actor.stop()
             record.cloud_node = None
-            self.shutdowns.pop(key, None)
 
     def update_arvados_nodes(self, nodelist):
         self._update_poll_time('arvados_nodes')
@@ -212,7 +223,8 @@ class NodeManagerDaemonActor(actor_class):
     def _nodes_missing(self):
         return sum(1 for arv_node in
                    pykka.get_all(rec.actor.arvados_node for rec in
-                                 self.cloud_nodes.nodes.itervalues())
+                                 self.cloud_nodes.nodes.itervalues()
+                                 if rec.actor.cloud_node.get().id not in self.shutdowns)
                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
 
     def _nodes_wanted(self):
@@ -228,7 +240,7 @@ class NodeManagerDaemonActor(actor_class):
             return len(self.last_wishlist) - up_count
 
     def _nodes_excess(self):
-        up_count = self._nodes_up() - len(self.shutdowns) - self._nodes_missing()
+        up_count = self._nodes_up() - len(self.shutdowns)
         over_min = up_count - self.min_nodes
         if over_min <= 0:
             return over_min
@@ -340,11 +352,13 @@ class NodeManagerDaemonActor(actor_class):
             self._begin_node_shutdown(record.actor, cancellable=False)
 
     def node_finished_shutdown(self, shutdown_actor):
-        success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
-                                                    'cloud_node')
+        cloud_node, success, cancel_reason = self._get_actor_attrs(
+            shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
         shutdown_actor.stop()
         cloud_node_id = cloud_node.id
         if not success:
+            if cancel_reason == self._node_shutdown.NODE_BROKEN:
+                self.cloud_nodes.blacklist(cloud_node_id)
             del self.shutdowns[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()