Merge branch 'master' into 4358-graph-not-comparing
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
index 9f22568faafbb1c45d9625816fb1c0027226882a..53af9339f0d010cae9839a242db3b5750ae56eaf 100644 (file)
@@ -26,14 +26,16 @@ class _BaseNodeTracker(object):
         self.nodes = {}
         self.orphans = {}
 
-    def __getitem__(self, key):
-        return self.nodes[key]
-
-    def __len__(self):
-        return len(self.nodes)
+    # Proxy the methods listed below to self.nodes.
+    def _proxy_method(name):
+        method = getattr(dict, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            return method(self.nodes, *args, **kwargs)
+        return wrapper
 
-    def get(self, key, default=None):
-        return self.nodes.get(key, default)
+    for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
+        locals()[_method_name] = _proxy_method(_method_name)
 
     def record_key(self, record):
         return self.item_key(getattr(record, self.RECORD_ATTR))
@@ -96,7 +98,9 @@ class NodeManagerDaemonActor(actor_class):
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
-                 poll_stale_after=600, node_stale_after=7200,
+                 poll_stale_after=600,
+                 boot_fail_after=1800,
+                 node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
                  node_actor_class=dispatch.ComputeNodeMonitorActor):
@@ -115,6 +119,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
         self.poll_stale_after = poll_stale_after
+        self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
         self.last_polls = {}
         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
@@ -174,6 +179,7 @@ class NodeManagerDaemonActor(actor_class):
                     break
         for key, record in self.cloud_nodes.orphans.iteritems():
             record.actor.stop()
+            record.cloud_node = None
             self.shutdowns.pop(key, None)
 
     def update_arvados_nodes(self, nodelist):
@@ -269,15 +275,15 @@ class NodeManagerDaemonActor(actor_class):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
 
     def node_up(self, setup_proxy):
-        cloud_node, arvados_node = self._get_actor_attrs(
-            setup_proxy, 'cloud_node', 'arvados_node')
+        cloud_node = setup_proxy.cloud_node.get()
         del self.booting[setup_proxy.actor_ref.actor_urn]
         setup_proxy.stop()
         record = self.cloud_nodes.get(cloud_node.id)
         if record is None:
             record = self._new_node(cloud_node)
             self.booted[cloud_node.id] = record
-        self._pair_nodes(record, arvados_node)
+        self._timer.schedule(time.time() + self.boot_fail_after,
+                             self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
     def stop_booting_node(self):
@@ -292,19 +298,31 @@ class NodeManagerDaemonActor(actor_class):
                     self._later.stop_booting_node()
                 break
 
-    @_check_poll_freshness
-    def node_can_shutdown(self, node_actor):
-        if self._nodes_excess() < 1:
-            return None
+    def _begin_node_shutdown(self, node_actor, cancellable):
         cloud_node_id = node_actor.cloud_node.get().id
         if cloud_node_id in self.shutdowns:
             return None
         shutdown = self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
-            node_monitor=node_actor.actor_ref).proxy()
+            node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         shutdown.subscribe(self._later.node_finished_shutdown)
 
+    @_check_poll_freshness
+    def node_can_shutdown(self, node_actor):
+        if self._nodes_excess() > 0:
+            self._begin_node_shutdown(node_actor, cancellable=True)
+
+    def shutdown_unpaired_node(self, cloud_node_id):
+        for record_dict in [self.cloud_nodes, self.booted]:
+            if cloud_node_id in record_dict:
+                record = record_dict[cloud_node_id]
+                break
+        else:
+            return None
+        if record.arvados_node is None:
+            self._begin_node_shutdown(record.actor, cancellable=False)
+
     def node_finished_shutdown(self, shutdown_actor):
         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
                                                     'cloud_node')