13804: Try to cancel pending shutdowns if nodes are needed
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 8c983c1ca4042726881ce0e9019da0e9c2df9b60..bb83a193fa30bae46dc7a7f6979843eb65fd42d1 100644 (file)
@@ -1,12 +1,18 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
 import functools
 import logging
 import time
 
 from __future__ import absolute_import, print_function
 
 import functools
 import logging
 import time
+import re
 
 import libcloud.common.types as cloud_types
 
 import libcloud.common.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
+
 import pykka
 
 from .. import \
 import pykka
 
 from .. import \
@@ -14,6 +20,10 @@ from .. import \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
+from ... import status
+from .transitions import transitions
+
+QuotaExceeded = "QuotaExceeded"
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
@@ -21,27 +31,32 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+    def __init__(self, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
-        RetryMixin.__init__(self,
-                            retry_wait,
-                            max_retry_wait,
-                            logging.getLogger(logger_name),
-                            cloud_client,
-                            timer_actor)
-        self._later = self.actor_ref.proxy()
+        RetryMixin.__init__(self, retry_wait, max_retry_wait,
+                            None, cloud_client, timer_actor)
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
         self._arvados = arvados_client
         self.subscribers = set()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
+
     def _finished(self):
     def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
+        if self.subscribers is None:
+            raise Exception("Actor tried to finish twice")
+        _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
         self.subscribers = None
         self.subscribers = None
+        self._logger.info("finished")
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
             try:
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
             try:
-                subscriber(self._later)
+                subscriber(self.actor_ref.proxy())
             except pykka.ActorDeadError:
                 pass
         else:
             except pykka.ActorDeadError:
                 pass
         else:
@@ -60,6 +75,17 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
                            'last_action': explanation}},
             ).execute()
 
                            'last_action': explanation}},
             ).execute()
 
+    @staticmethod
+    def _finish_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._finished()
+        return finish_wrapper
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -74,38 +100,71 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
+        self.error = None
         if arvados_node is None:
             self._later.create_arvados_node()
         else:
             self._later.prepare_arvados_node(arvados_node)
 
         if arvados_node is None:
             self._later.create_arvados_node()
         else:
             self._later.prepare_arvados_node(arvados_node)
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self.arvados_node = self._arvados.nodes().create(
+            body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
         self._later.create_cloud_node()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
-        self.arvados_node = self._clean_arvados_node(
-            node, "Prepared by Node Manager")
+        self._clean_arvados_node(node, "Prepared by Node Manager")
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'], body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
         self._later.create_cloud_node()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry()
     def create_cloud_node(self):
     @RetryMixin._retry()
     def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
+        self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
                           self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        if not self.cloud_node.size:
-             self.cloud_node.size = self.cloud_size
+        try:
+            self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                      self.arvados_node)
+        except BaseHTTPError as e:
+            if e.code == 429 or "RequestLimitExceeded" in e.message:
+                # Don't consider API rate limits to be quota errors.
+                # re-raise so the Retry logic applies.
+                raise
+
+            # The set of possible error codes / messages isn't documented for
+            # all clouds, so use a keyword heuristic to determine if the
+            # failure is likely due to a quota.
+            if re.search(r'(exceed|quota|limit)', e.message, re.I):
+                self.error = QuotaExceeded
+                self._logger.warning("Quota exceeded: %s", e)
+                self._finished()
+                return
+            else:
+                # Something else happened, re-raise so the Retry logic applies.
+                raise
+        except Exception as e:
+            raise
+
+        # The information included in the node size object we get from libcloud
+        # is inconsistent between cloud drivers.  Replace libcloud NodeSize
+        # object with compatible CloudSizeWrapper object which merges the size
+        # info reported from the cloud with size information from the
+        # configuration file.
+        self.cloud_node.size = self.cloud_size
+
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
@@ -150,7 +209,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     """
     # Reasons for a shutdown to be cancelled.
     WINDOW_CLOSED = "shutdown window closed"
     """
     # Reasons for a shutdown to be cancelled.
     WINDOW_CLOSED = "shutdown window closed"
-    NODE_BROKEN = "cloud failed to shut down broken node"
+    DESTROY_FAILED = "destroy_node failed"
 
     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
                  cancellable=True, retry_wait=1, max_retry_wait=180):
 
     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
                  cancellable=True, retry_wait=1, max_retry_wait=180):
@@ -160,7 +219,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -168,7 +227,11 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self.cancel_reason = None
         self.success = None
 
         self.cancel_reason = None
         self.success = None
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
     def on_start(self):
     def on_start(self):
+        super(ComputeNodeShutdownActor, self).on_start()
         self._later.shutdown_node()
 
     def _arvados_node(self):
         self._later.shutdown_node()
 
     def _arvados_node(self):
@@ -179,49 +242,66 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
             self.success = success_flag
         return super(ComputeNodeShutdownActor, self)._finished()
 
             self.success = success_flag
         return super(ComputeNodeShutdownActor, self)._finished()
 
-    def cancel_shutdown(self, reason):
+    def cancel_shutdown(self, reason, **kwargs):
+        if not self.cancellable:
+            return False
+        if self.cancel_reason is not None:
+            # already cancelled
+            return False
         self.cancel_reason = reason
         self.cancel_reason = reason
-        self._logger.info("Cloud node %s shutdown cancelled: %s.",
-                          self.cloud_node.id, reason)
+        self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
         self._finished(success_flag=False)
+        return True
 
 
-    def _stop_if_window_closed(orig_func):
+    def _cancel_on_exception(orig_func):
         @functools.wraps(orig_func)
         @functools.wraps(orig_func)
-        def stop_wrapper(self, *args, **kwargs):
-            if (self.cancellable and
-                  (not self._monitor.shutdown_eligible().get())):
-                self._later.cancel_shutdown(self.WINDOW_CLOSED)
-                return None
-            else:
+        def finish_wrapper(self, *args, **kwargs):
+            try:
                 return orig_func(self, *args, **kwargs)
                 return orig_func(self, *args, **kwargs)
-        return stop_wrapper
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._logger.debug("", exc_info=True)
+                self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
+        return finish_wrapper
 
 
-    @_stop_if_window_closed
-    @RetryMixin._retry()
+    @_cancel_on_exception
     def shutdown_node(self):
     def shutdown_node(self):
-        if not self._cloud.destroy_node(self.cloud_node):
-            if self._cloud.broken(self.cloud_node):
-                self._later.cancel_shutdown(self.NODE_BROKEN)
-            else:
-                # Force a retry.
-                raise cloud_types.LibcloudError("destroy_node failed")
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
+        if self.cancellable:
+            self._logger.info("Checking that node is still eligible for shutdown")
+            eligible, reason = self._monitor.shutdown_eligible().get()
+            if not eligible:
+                self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
+                                     try_resume=True)
+                return
+        # If boot failed, count the event
+        if self._monitor.get_state().get() == 'unpaired':
+            status.tracker.counter_add('boot_failures')
+        self._destroy_node()
+
+    def _destroy_node(self):
+        self._logger.info("Starting shutdown")
         arv_node = self._arvados_node()
         arv_node = self._arvados_node()
-        if arv_node is None:
-            self._finished(success_flag=True)
+        if self._cloud.destroy_node(self.cloud_node):
+            self.cancellable = False
+            self._logger.info("Shutdown success")
+            if arv_node:
+                self._later.clean_arvados_node(arv_node)
+            else:
+                self._finished(success_flag=True)
         else:
         else:
-            self._later.clean_arvados_node(arv_node)
+            self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
 
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
 
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
 
-    # Make the decorator available to subclasses.
-    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
 
 
-
-class ComputeNodeUpdateActor(config.actor_class):
+class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
     """Actor to dispatch one-off cloud management requests.
 
     This actor receives requests for small cloud updates, and
     """Actor to dispatch one-off cloud management requests.
 
     This actor receives requests for small cloud updates, and
@@ -229,42 +309,24 @@ class ComputeNodeUpdateActor(config.actor_class):
     this to perform maintenance tasks on themselves.  Having a
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
     this to perform maintenance tasks on themselves.  Having a
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
     """
     """
-    def __init__(self, cloud_factory, max_retry_wait=180):
+    def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
         super(ComputeNodeUpdateActor, self).__init__()
+        RetryMixin.__init__(self, 1, max_retry_wait,
+                            None, cloud_factory(), timer_actor)
         self._cloud = cloud_factory()
         self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
+        self._later = self.actor_ref.tell_proxy()
 
 
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def throttle_wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except Exception as error:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return throttle_wrapper
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
 
 
-    @_throttle_errors
+    @RetryMixin._retry()
     def sync_node(self, cloud_node, arvados_node):
     def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
+        if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+            return self._cloud.sync_node(cloud_node, arvados_node)
 
 
 class ComputeNodeMonitorActor(config.actor_class):
 
 
 class ComputeNodeMonitorActor(config.actor_class):
@@ -275,16 +337,13 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+                 timer_actor, update_actor, cloud_client,
                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
+        self._later = self.actor_ref.tell_proxy()
         self._shutdowns = shutdown_timer
         self._shutdowns = shutdown_timer
-        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self._cloud = cloud_client
         self._timer = timer_actor
         self._update = update_actor
         self._cloud = cloud_client
@@ -295,73 +354,149 @@ class ComputeNodeMonitorActor(config.actor_class):
         self.boot_fail_after = boot_fail_after
         self.subscribers = set()
         self.arvados_node = None
         self.boot_fail_after = boot_fail_after
         self.subscribers = set()
         self.arvados_node = None
+        self.consecutive_idle = 0
         self._later.update_arvados_node(arvados_node)
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
         self._later.update_arvados_node(arvados_node)
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+    def on_start(self):
+        self._set_logger()
+        self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
     def _debug(self, msg, *args):
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
     def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
         self._logger.debug(msg, *args)
 
         self._logger.debug(msg, *args)
 
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
+    def get_state(self):
+        """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
+
+        # If this node is not associated with an Arvados node, return
+        # 'unpaired' if we're in the boot grace period, and 'down' if not,
+        # so it isn't counted towards usable nodes.
+        if self.arvados_node is None:
+            if timestamp_fresh(self.cloud_node_start_time,
+                               self.boot_fail_after):
+                return 'unpaired'
+            else:
+                return 'down'
+
         state = self.arvados_node['crunch_worker_state']
         state = self.arvados_node['crunch_worker_state']
-        if not state:
-            return None
-        result = state in states
+
+        # If state information is not available because it is missing or the
+        # record is stale, return 'down'.
+        if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
+                                            self.node_stale_after):
+            state = 'down'
+
+        # There's a window between when a node pings for the first time and the
+        # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
+        # window, the node will still report as 'down'.  Check that
+        # first_ping_at is truthy and consider the node 'idle' during the
+        # initial boot grace period.
+        if (state == 'down' and
+            self.arvados_node['first_ping_at'] and
+            timestamp_fresh(self.cloud_node_start_time,
+                            self.boot_fail_after) and
+            not self._cloud.broken(self.cloud_node)):
+            state = 'idle'
+
+        # "missing" means last_ping_at is stale, this should be
+        # considered "down"
+        if arvados_node_missing(self.arvados_node, self.node_stale_after):
+            state = 'down'
+
+        # Turns out using 'job_uuid' this way is a bad idea.  The node record
+        # is assigned the job_uuid before the job is locked (which removes it
+        # from the queue) which means the job will be double-counted as both in
+        # the wishlist and but also keeping a node busy.  This end result is
+        # excess nodes being booted.
+        #if state == 'idle' and self.arvados_node['job_uuid']:
+        #    state = 'busy'
+
+        # Update idle node times tracker
         if state == 'idle':
         if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
+            status.tracker.idle_in(self.arvados_node['hostname'])
+        else:
+            status.tracker.idle_out(self.arvados_node['hostname'])
+
+        return state
+
+    def in_state(self, *states):
+        return self.get_state() in states
 
     def shutdown_eligible(self):
 
     def shutdown_eligible(self):
-        if not self._shutdowns.window_open():
-            return False
-        if self.arvados_node is None:
-            # Node is unpaired.
-            # If it hasn't pinged Arvados after boot_fail seconds, shut it down
-            return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
-        missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
-        if missing and self._cloud.broken(self.cloud_node):
-            # Node is paired, but Arvados says it is missing and the cloud says the node
-            # is in an error state, so shut it down.
-            return True
-        if missing is None and self._cloud.broken(self.cloud_node):
-            self._logger.warning(
-                "cloud reports broken node, but paired node %s never pinged "
-                "(bug?) -- skipped check for node_stale_after",
-                self.arvados_node['uuid'])
-        return self.in_state('idle')
+        """Determine if node is candidate for shut down.
+
+        Returns a tuple of (boolean, string) where the first value is whether
+        the node is candidate for shut down, and the second value is the
+        reason for the decision.
+        """
+
+        # Collect states and then consult state transition table whether we
+        # should shut down.  Possible states are:
+        # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
+        # window = ["open", "closed"]
+        # boot_grace = ["boot wait", "boot exceeded"]
+        # idle_grace = ["not idle", "idle wait", "idle exceeded"]
+
+        if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
+            return (False, "node state is stale")
+
+        crunch_worker_state = self.get_state()
+
+        window = "open" if self._shutdowns.window_open() else "closed"
+
+        if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+            boot_grace = "boot wait"
+        else:
+            boot_grace = "boot exceeded"
+
+        if crunch_worker_state == "idle":
+            # Must report as "idle" at least two consecutive times
+            if self.consecutive_idle < 2:
+                idle_grace = 'idle wait'
+            else:
+                idle_grace = 'idle exceeded'
+        else:
+            idle_grace = 'not idle'
+
+        node_state = (crunch_worker_state, window, boot_grace, idle_grace)
+        t = transitions[node_state]
+        if t is not None:
+            # yes, shutdown eligible
+            return (True, "node state is %s" % (node_state,))
+        else:
+            # no, return a reason
+            return (False, "node state is %s" % (node_state,))
 
     def consider_shutdown(self):
 
     def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self.shutdown_eligible():
-            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-            _notify_subscribers(self._later, self.subscribers)
-        elif self._shutdowns.window_open():
-            self._debug("Node %s shutdown window open but node busy.",
-                        self.cloud_node.id)
-        elif self.last_shutdown_opening != next_opening:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
+        try:
+            eligible, reason = self.shutdown_eligible()
+            next_opening = self._shutdowns.next_opening()
+            if eligible:
+                self._debug("Suggesting shutdown because %s", reason)
+                _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
+            else:
+                self._debug("Not eligible for shut down because %s", reason)
+
+                if self.last_shutdown_opening != next_opening:
+                    self._debug("Shutdown window closed.  Next at %s.",
+                                time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+                    self._timer.schedule(next_opening, self._later.consider_shutdown)
+                    self.last_shutdown_opening = next_opening
+        except Exception:
+            self._logger.exception("Unexpected exception")
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
         if (self.arvados_node is not None) or (not first_ping_s):
             return None
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
         if (self.arvados_node is not None) or (not first_ping_s):
             return None
-        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+        elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
@@ -374,8 +509,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
-        # If the cloud node's FQDN doesn't match what's in the Arvados node
-        # record, make them match.
+        """Called when the latest Arvados node record is retrieved.
+
+        Calls the updater's sync_node() method.
+
+        """
         # This method is a little unusual in the way it just fires off the
         # request without checking the result or retrying errors.  That's
         # because this update happens every time we reload the Arvados node
         # This method is a little unusual in the way it just fires off the
         # request without checking the result or retrying errors.  That's
         # because this update happens every time we reload the Arvados node
@@ -384,7 +522,9 @@ class ComputeNodeMonitorActor(config.actor_class):
         # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
         # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            if (self._cloud_node_fqdn(self.cloud_node) !=
-                  arvados_node_fqdn(self.arvados_node)):
-                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._update.sync_node(self.cloud_node, self.arvados_node)
+            if self.arvados_node['crunch_worker_state'] == "idle":
+                self.consecutive_idle += 1
+            else:
+                self.consecutive_idle = 0
             self._later.consider_shutdown()
             self._later.consider_shutdown()