9303: Fetch arv_node before trying to shut down node, because monitor actor may
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 8c983c1ca4042726881ce0e9019da0e9c2df9b60..a950210aa89c0bbf18e9df64815393bfae71c65a 100644 (file)
@@ -14,6 +14,7 @@ from .. import \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
+from .transitions import transitions
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
@@ -21,27 +22,32 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+    def __init__(self, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
-        RetryMixin.__init__(self,
-                            retry_wait,
-                            max_retry_wait,
-                            logging.getLogger(logger_name),
-                            cloud_client,
-                            timer_actor)
-        self._later = self.actor_ref.proxy()
+        RetryMixin.__init__(self, retry_wait, max_retry_wait,
+                            None, cloud_client, timer_actor)
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
         self._arvados = arvados_client
         self.subscribers = set()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
+
     def _finished(self):
     def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
+        if self.subscribers is None:
+            raise Exception("Actor tried to finish twice")
+        _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
         self.subscribers = None
         self.subscribers = None
+        self._logger.info("finished")
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
             try:
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
             try:
-                subscriber(self._later)
+                subscriber(self.actor_ref.proxy())
             except pykka.ActorDeadError:
                 pass
         else:
             except pykka.ActorDeadError:
                 pass
         else:
@@ -60,6 +66,17 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
                            'last_action': explanation}},
             ).execute()
 
                            'last_action': explanation}},
             ).execute()
 
+    @staticmethod
+    def _finish_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._finished()
+        return finish_wrapper
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -74,7 +91,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
@@ -84,20 +101,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
         else:
             self._later.prepare_arvados_node(arvados_node)
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._clean_arvados_node(
             node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._clean_arvados_node(
             node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry()
     def create_cloud_node(self):
     @RetryMixin._retry()
     def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
+        self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
@@ -106,6 +126,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
@@ -160,7 +181,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -168,7 +189,11 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self.cancel_reason = None
         self.success = None
 
         self.cancel_reason = None
         self.success = None
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
     def on_start(self):
     def on_start(self):
+        super(ComputeNodeShutdownActor, self).on_start()
         self._later.shutdown_node()
 
     def _arvados_node(self):
         self._later.shutdown_node()
 
     def _arvados_node(self):
@@ -181,45 +206,43 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def cancel_shutdown(self, reason):
         self.cancel_reason = reason
 
     def cancel_shutdown(self, reason):
         self.cancel_reason = reason
-        self._logger.info("Cloud node %s shutdown cancelled: %s.",
-                          self.cloud_node.id, reason)
+        self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
 
         self._finished(success_flag=False)
 
-    def _stop_if_window_closed(orig_func):
+    def _cancel_on_exception(orig_func):
         @functools.wraps(orig_func)
         @functools.wraps(orig_func)
-        def stop_wrapper(self, *args, **kwargs):
-            if (self.cancellable and
-                  (not self._monitor.shutdown_eligible().get())):
-                self._later.cancel_shutdown(self.WINDOW_CLOSED)
-                return None
-            else:
+        def finish_wrapper(self, *args, **kwargs):
+            try:
                 return orig_func(self, *args, **kwargs)
                 return orig_func(self, *args, **kwargs)
-        return stop_wrapper
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._later.cancel_shutdown("Unhandled exception %s" % error)
+        return finish_wrapper
 
 
-    @_stop_if_window_closed
+    @_cancel_on_exception
     @RetryMixin._retry()
     def shutdown_node(self):
     @RetryMixin._retry()
     def shutdown_node(self):
+        self._logger.info("Starting shutdown")
+        arv_node = self._arvados_node()
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
+                return
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        arv_node = self._arvados_node()
+        self._logger.info("Shutdown success")
         if arv_node is None:
             self._finished(success_flag=True)
         else:
             self._later.clean_arvados_node(arv_node)
 
         if arv_node is None:
             self._finished(success_flag=True)
         else:
             self._later.clean_arvados_node(arv_node)
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
 
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
 
-    # Make the decorator available to subclasses.
-    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
-
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
@@ -229,12 +252,6 @@ class ComputeNodeUpdateActor(config.actor_class):
     this to perform maintenance tasks on themselves.  Having a
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
     this to perform maintenance tasks on themselves.  Having a
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
     """
     def __init__(self, cloud_factory, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
     """
     def __init__(self, cloud_factory, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
@@ -243,6 +260,12 @@ class ComputeNodeUpdateActor(config.actor_class):
         self.error_streak = 0
         self.next_request_time = time.time()
 
         self.error_streak = 0
         self.next_request_time = time.time()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
+
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
         def throttle_wrapper(self, *args, **kwargs):
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
         def throttle_wrapper(self, *args, **kwargs):
@@ -253,10 +276,12 @@ class ComputeNodeUpdateActor(config.actor_class):
             try:
                 result = orig_func(self, *args, **kwargs)
             except Exception as error:
             try:
                 result = orig_func(self, *args, **kwargs)
             except Exception as error:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
+                if self._cloud.is_cloud_exception(error):
+                    self.error_streak += 1
+                    self.next_request_time += min(2 ** self.error_streak,
+                                                  self.max_retry_wait)
+                self._logger.warn(
+                    "Unhandled exception: %s", error, exc_info=error)
             else:
                 self.error_streak = 0
                 return result
             else:
                 self.error_streak = 0
                 return result
@@ -280,8 +305,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
+        self._later = self.actor_ref.tell_proxy()
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
@@ -299,6 +323,13 @@ class ComputeNodeMonitorActor(config.actor_class):
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+    def on_start(self):
+        self._set_logger()
+        self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
@@ -308,60 +339,113 @@ class ComputeNodeMonitorActor(config.actor_class):
         self._last_log = msg
         self._logger.debug(msg, *args)
 
         self._last_log = msg
         self._logger.debug(msg, *args)
 
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
+    def get_state(self):
+        """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
+
+        # If this node is not associated with an Arvados node, return 'unpaired'.
+        if self.arvados_node is None:
+            return 'unpaired'
+
         state = self.arvados_node['crunch_worker_state']
         state = self.arvados_node['crunch_worker_state']
-        if not state:
-            return None
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
+
+        # If state information is not available because it is missing or the
+        # record is stale, return 'down'.
+        if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
+                                            self.node_stale_after):
+            state = 'down'
+
+        # There's a window between when a node pings for the first time and the
+        # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
+        # window, the node will still report as 'down'.  Check that
+        # first_ping_at is truthy and consider the node 'idle' during the
+        # initial boot grace period.
+        if (state == 'down' and
+            self.arvados_node['first_ping_at'] and
+            timestamp_fresh(self.cloud_node_start_time,
+                            self.boot_fail_after) and
+            not self._cloud.broken(self.cloud_node)):
+            state = 'idle'
+
+        # "missing" means last_ping_at is stale, this should be
+        # considered "down"
+        if arvados_node_missing(self.arvados_node, self.node_stale_after):
+            state = 'down'
+
+        # Turns out using 'job_uuid' this way is a bad idea.  The node record
+        # is assigned the job_uuid before the job is locked (which removes it
+        # from the queue) which means the job will be double-counted as both in
+        # the wishlist and but also keeping a node busy.  This end result is
+        # excess nodes being booted.
+        #if state == 'idle' and self.arvados_node['job_uuid']:
+        #    state = 'busy'
+
+        return state
+
+    def in_state(self, *states):
+        return self.get_state() in states
 
     def shutdown_eligible(self):
 
     def shutdown_eligible(self):
-        if not self._shutdowns.window_open():
-            return False
-        if self.arvados_node is None:
-            # Node is unpaired.
-            # If it hasn't pinged Arvados after boot_fail seconds, shut it down
-            return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
-        missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
-        if missing and self._cloud.broken(self.cloud_node):
-            # Node is paired, but Arvados says it is missing and the cloud says the node
-            # is in an error state, so shut it down.
-            return True
-        if missing is None and self._cloud.broken(self.cloud_node):
-            self._logger.warning(
-                "cloud reports broken node, but paired node %s never pinged "
-                "(bug?) -- skipped check for node_stale_after",
-                self.arvados_node['uuid'])
-        return self.in_state('idle')
+        """Determine if node is candidate for shut down.
+
+        Returns a tuple of (boolean, string) where the first value is whether
+        the node is candidate for shut down, and the second value is the
+        reason for the decision.
+        """
+
+        # Collect states and then consult state transition table whether we
+        # should shut down.  Possible states are:
+        # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
+        # window = ["open", "closed"]
+        # boot_grace = ["boot wait", "boot exceeded"]
+        # idle_grace = ["not idle", "idle wait", "idle exceeded"]
+
+        if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
+            return (False, "node state is stale")
+
+        crunch_worker_state = self.get_state()
+
+        window = "open" if self._shutdowns.window_open() else "closed"
+
+        if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+            boot_grace = "boot wait"
+        else:
+            boot_grace = "boot exceeded"
+
+        # API server side not implemented yet.
+        idle_grace = 'idle exceeded'
+
+        node_state = (crunch_worker_state, window, boot_grace, idle_grace)
+        t = transitions[node_state]
+        if t is not None:
+            # yes, shutdown eligible
+            return (True, "node state is %s" % (node_state,))
+        else:
+            # no, return a reason
+            return (False, "node state is %s" % (node_state,))
 
     def consider_shutdown(self):
 
     def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self.shutdown_eligible():
-            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-            _notify_subscribers(self._later, self.subscribers)
-        elif self._shutdowns.window_open():
-            self._debug("Node %s shutdown window open but node busy.",
-                        self.cloud_node.id)
-        elif self.last_shutdown_opening != next_opening:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
+        try:
+            eligible, reason = self.shutdown_eligible()
+            next_opening = self._shutdowns.next_opening()
+            if eligible:
+                self._debug("Suggesting shutdown because %s", reason)
+                _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
+            else:
+                self._debug("Not eligible for shut down because %s", reason)
+
+                if self.last_shutdown_opening != next_opening:
+                    self._debug("Shutdown window closed.  Next at %s.",
+                                time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+                    self._timer.schedule(next_opening, self._later.consider_shutdown)
+                    self.last_shutdown_opening = next_opening
+        except Exception:
+            self._logger.exception("Unexpected exception")
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
         if (self.arvados_node is not None) or (not first_ping_s):
             return None
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
         if (self.arvados_node is not None) or (not first_ping_s):
             return None
-        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+        elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id