Merge branch 'master' into 5383-api-db-current-time
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index d613ef13716a2dad9b946f8d2a158461950dd1ba..70817627dfe8d3194435a7a31f1df8e330e37ed5 100644 (file)
@@ -6,9 +6,11 @@ import functools
 import logging
 import time
 
 import logging
 import time
 
+import libcloud.common.types as cloud_types
 import pykka
 
 import pykka
 
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
 from ...clientactor import _notify_subscribers
 from ... import config
 
 from ...clientactor import _notify_subscribers
 from ... import config
 
@@ -18,35 +20,42 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
+                start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
+                    self._timer.schedule(start_time + self.retry_wait,
                                          getattr(self._later,
                                                  orig_func.__name__),
                                          *args, **kwargs)
                                          getattr(self._later,
                                                  orig_func.__name__),
                                          *args, **kwargs)
@@ -54,7 +63,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
         return decorator
 
     def _finished(self):
@@ -84,9 +93,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -95,12 +104,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -114,13 +123,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -133,20 +148,56 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     This actor simply destroys a cloud node, retrying as needed.
     """
 
     This actor simply destroys a cloud node, retrying as needed.
     """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
+    def __init__(self, timer_actor, cloud_client, node_monitor,
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
+        self._monitor = node_monitor.proxy()
+        self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
+        self.success = None
+
+    def on_start(self):
         self._later.shutdown_node()
 
         self._later.shutdown_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+    def cancel_shutdown(self):
+        self.success = False
         self._finished()
 
         self._finished()
 
+    def _stop_if_window_closed(orig_func):
+        @functools.wraps(orig_func)
+        def stop_wrapper(self, *args, **kwargs):
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
+                self._logger.info(
+                    "Cloud node %s shutdown cancelled - no longer eligible.",
+                    self.cloud_node.id)
+                self._later.cancel_shutdown()
+                return None
+            else:
+                return orig_func(self, *args, **kwargs)
+        return stop_wrapper
+
+    @_stop_if_window_closed
+    @ComputeNodeStateChangeBase._retry()
+    def shutdown_node(self):
+        if self._cloud.destroy_node(self.cloud_node):
+            self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+            self.success = True
+            self._finished()
+        else:
+            # Force a retry.
+            raise cloud_types.LibcloudError("destroy_node failed")
+
+    # Make the decorator available to subclasses.
+    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
+
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
@@ -172,14 +223,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -187,7 +238,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
@@ -202,13 +253,14 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
+                 cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
                  poll_stale_after=600, node_stale_after=3600):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
                  poll_stale_after=600, node_stale_after=3600):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
+        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self.cloud_node = cloud_node
         self._timer = timer_actor
         self._update = update_actor
         self.cloud_node = cloud_node
@@ -238,7 +290,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         if (self.arvados_node is None) or not timestamp_fresh(
               arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return None
         if (self.arvados_node is None) or not timestamp_fresh(
               arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return None
-        state = self.arvados_node['info'].get('slurm_state')
+        state = self.arvados_node['crunch_worker_state']
         if not state:
             return None
         result = state in states
         if not state:
             return None
         result = state in states
@@ -246,8 +298,10 @@ class ComputeNodeMonitorActor(config.actor_class):
             result = result and not self.arvados_node['job_uuid']
         return result
 
             result = result and not self.arvados_node['job_uuid']
         return result
 
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
+    def shutdown_eligible(self):
+        if not self._shutdowns.window_open():
+            return False
+        elif self.arvados_node is None:
             # If this is a new, unpaired node, it's eligible for
             # shutdown--we figure there was an error during bootstrap.
             return timestamp_fresh(self.cloud_node_start_time,
             # If this is a new, unpaired node, it's eligible for
             # shutdown--we figure there was an error during bootstrap.
             return timestamp_fresh(self.cloud_node_start_time,
@@ -257,24 +311,24 @@ class ComputeNodeMonitorActor(config.actor_class):
 
     def consider_shutdown(self):
         next_opening = self._shutdowns.next_opening()
 
     def consider_shutdown(self):
         next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
+        if self.shutdown_eligible():
+            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+            _notify_subscribers(self._later, self.subscribers)
+        elif self._shutdowns.window_open():
+            self._debug("Node %s shutdown window open but node busy.",
+                        self.cloud_node.id)
+        elif self.last_shutdown_opening != next_opening:
             self._debug("Node %s shutdown window closed.  Next at %s.",
                         self.cloud_node.id, time.ctime(next_opening))
             self._debug("Node %s shutdown window closed.  Next at %s.",
                         self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
             self._timer.schedule(next_opening, self._later.consider_shutdown)
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
             self._timer.schedule(next_opening, self._later.consider_shutdown)
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
+        first_ping_s = arvados_node.get('first_ping_at')
+        if (self.arvados_node is not None) or (not first_ping_s):
             return None
             return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+              (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
@@ -286,9 +340,17 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
+        # If the cloud node's FQDN doesn't match what's in the Arvados node
+        # record, make them match.
+        # This method is a little unusual in the way it just fires off the
+        # request without checking the result or retrying errors.  That's
+        # because this update happens every time we reload the Arvados node
+        # list: if a previous sync attempt failed, we'll see that the names
+        # are out of sync and just try again.  ComputeNodeUpdateActor has
+        # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
+            if (self._cloud_node_fqdn(self.cloud_node) !=
+                  arvados_node_fqdn(self.arvados_node)):
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()