7286: Add test that "missing" nodes are not counted towards "busy" (but are
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 1608b529fb848e00ac8968ef8a04a427e6f4e0a7..455719832d7da5357273d4c23574f8a02b6fe237 100644 (file)
@@ -9,7 +9,8 @@ import time
 import libcloud.common.types as cloud_types
 import pykka
 
 import libcloud.common.types as cloud_types
 import pykka
 
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
 from ...clientactor import _notify_subscribers
 from ... import config
 
 from ...clientactor import _notify_subscribers
 from ... import config
 
@@ -19,12 +20,13 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, timer_actor,
+    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger(logger_name)
         self._cloud = cloud_client
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger(logger_name)
         self._cloud = cloud_client
+        self._arvados = arvados_client
         self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
@@ -78,6 +80,18 @@ class ComputeNodeStateChangeBase(config.actor_class):
         else:
             self.subscribers.add(subscriber)
 
         else:
             self.subscribers.add(subscriber)
 
+    def _clean_arvados_node(self, arvados_node, explanation):
+        return self._arvados.nodes().update(
+            uuid=arvados_node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': explanation}},
+            ).execute()
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -92,9 +106,8 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, timer_actor,
+            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
             retry_wait, max_retry_wait)
-        self._arvados = arvados_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -103,23 +116,15 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry()
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry()
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
     def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
+        self.arvados_node = self._clean_arvados_node(
+            node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._retry()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._retry()
@@ -138,8 +143,10 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._finished()
 
     def stop_if_no_cloud_node(self):
         self._finished()
 
     def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
+        if self.cloud_node is not None:
+            return False
+        self.stop()
+        return True
 
 
 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
 
 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
@@ -147,7 +154,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     This actor simply destroys a cloud node, retrying as needed.
     """
 
     This actor simply destroys a cloud node, retrying as needed.
     """
-    def __init__(self, timer_actor, cloud_client, node_monitor,
+    def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
                  cancellable=True, retry_wait=1, max_retry_wait=180):
         # If a ShutdownActor is cancellable, it will ask the
         # ComputeNodeMonitorActor if it's still eligible before taking each
                  cancellable=True, retry_wait=1, max_retry_wait=180):
         # If a ShutdownActor is cancellable, it will ask the
         # ComputeNodeMonitorActor if it's still eligible before taking each
@@ -155,7 +162,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, timer_actor,
+            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -165,9 +172,16 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     def on_start(self):
         self._later.shutdown_node()
 
     def on_start(self):
         self._later.shutdown_node()
 
+    def _arvados_node(self):
+        return self._monitor.arvados_node.get()
+
+    def _finished(self, success_flag=None):
+        if success_flag is not None:
+            self.success = success_flag
+        return super(ComputeNodeShutdownActor, self)._finished()
+
     def cancel_shutdown(self):
     def cancel_shutdown(self):
-        self.success = False
-        self._finished()
+        self._finished(success_flag=False)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
@@ -186,13 +200,20 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     @_stop_if_window_closed
     @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
     @_stop_if_window_closed
     @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
-        if self._cloud.destroy_node(self.cloud_node):
-            self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-            self.success = True
-            self._finished()
-        else:
+        if not self._cloud.destroy_node(self.cloud_node):
             # Force a retry.
             raise cloud_types.LibcloudError("destroy_node failed")
             # Force a retry.
             raise cloud_types.LibcloudError("destroy_node failed")
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        arv_node = self._arvados_node()
+        if arv_node is None:
+            self._finished(success_flag=True)
+        else:
+            self._later.clean_arvados_node(arv_node)
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def clean_arvados_node(self, arvados_node):
+        self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
+        self._finished(success_flag=True)
 
     # Make the decorator available to subclasses.
     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
 
     # Make the decorator available to subclasses.
     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
@@ -252,19 +273,24 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
+                 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+                 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
+                 boot_fail_after=1800
+    ):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
+        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self._timer = timer_actor
         self._update = update_actor
+        self._cloud = cloud_client
         self.cloud_node = cloud_node
         self.cloud_node_start_time = cloud_node_start_time
         self.poll_stale_after = poll_stale_after
         self.node_stale_after = node_stale_after
         self.cloud_node = cloud_node
         self.cloud_node_start_time = cloud_node_start_time
         self.poll_stale_after = poll_stale_after
         self.node_stale_after = node_stale_after
+        self.boot_fail_after = boot_fail_after
         self.subscribers = set()
         self.arvados_node = None
         self._later.update_arvados_node(arvados_node)
         self.subscribers = set()
         self.arvados_node = None
         self._later.update_arvados_node(arvados_node)
@@ -300,10 +326,13 @@ class ComputeNodeMonitorActor(config.actor_class):
         if not self._shutdowns.window_open():
             return False
         elif self.arvados_node is None:
         if not self._shutdowns.window_open():
             return False
         elif self.arvados_node is None:
-            # If this is a new, unpaired node, it's eligible for
-            # shutdown--we figure there was an error during bootstrap.
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
+            # Node is unpaired.
+            # If it hasn't pinged Arvados after boot_fail seconds, shut it down
+            return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+        elif self.arvados_node.get('status') == "missing" and self._cloud.broken(self.cloud_node):
+            # Node is paired, but Arvados says it is missing and the cloud says the node
+            # is in an error state, so shut it down.
+            return True
         else:
             return self.in_state('idle')
 
         else:
             return self.in_state('idle')
 
@@ -322,9 +351,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
+        first_ping_s = arvados_node.get('first_ping_at')
+        if (self.arvados_node is not None) or (not first_ping_s):
             return None
             return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+              (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
@@ -336,9 +367,17 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
+        # If the cloud node's FQDN doesn't match what's in the Arvados node
+        # record, make them match.
+        # This method is a little unusual in the way it just fires off the
+        # request without checking the result or retrying errors.  That's
+        # because this update happens every time we reload the Arvados node
+        # list: if a previous sync attempt failed, we'll see that the names
+        # are out of sync and just try again.  ComputeNodeUpdateActor has
+        # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
+            if (self._cloud_node_fqdn(self.cloud_node) !=
+                  arvados_node_fqdn(self.arvados_node)):
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()