Merge branch 'master' into 5383-api-db-current-time
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 4dc3dcb7fb9215f813f3f531ffac3fbc563988a3..70817627dfe8d3194435a7a31f1df8e330e37ed5 100644 (file)
@@ -6,9 +6,11 @@ import functools
 import logging
 import time
 
+import libcloud.common.types as cloud_types
 import pykka
 
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
 from ...clientactor import _notify_subscribers
 from ... import config
 
@@ -18,35 +20,42 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
+                start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
+                    self._timer.schedule(start_time + self.retry_wait,
                                          getattr(self._later,
                                                  orig_func.__name__),
                                          *args, **kwargs)
@@ -54,7 +63,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
@@ -84,9 +93,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -95,12 +104,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -114,13 +123,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -133,20 +148,56 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     This actor simply destroys a cloud node, retrying as needed.
     """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
+    def __init__(self, timer_actor, cloud_client, node_monitor,
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
+        self._monitor = node_monitor.proxy()
+        self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
+        self.success = None
+
+    def on_start(self):
         self._later.shutdown_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+    def cancel_shutdown(self):
+        self.success = False
         self._finished()
 
+    def _stop_if_window_closed(orig_func):
+        @functools.wraps(orig_func)
+        def stop_wrapper(self, *args, **kwargs):
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
+                self._logger.info(
+                    "Cloud node %s shutdown cancelled - no longer eligible.",
+                    self.cloud_node.id)
+                self._later.cancel_shutdown()
+                return None
+            else:
+                return orig_func(self, *args, **kwargs)
+        return stop_wrapper
+
+    @_stop_if_window_closed
+    @ComputeNodeStateChangeBase._retry()
+    def shutdown_node(self):
+        if self._cloud.destroy_node(self.cloud_node):
+            self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+            self.success = True
+            self._finished()
+        else:
+            # Force a retry.
+            raise cloud_types.LibcloudError("destroy_node failed")
+
+    # Make the decorator available to subclasses.
+    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
+
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
@@ -172,14 +223,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -187,7 +238,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
@@ -202,13 +253,14 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
+                 cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
                  poll_stale_after=600, node_stale_after=3600):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
+        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self.cloud_node = cloud_node
@@ -238,7 +290,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         if (self.arvados_node is None) or not timestamp_fresh(
               arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return None
-        state = self.arvados_node['info'].get('slurm_state')
+        state = self.arvados_node['crunch_worker_state']
         if not state:
             return None
         result = state in states
@@ -272,9 +324,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
+        first_ping_s = arvados_node.get('first_ping_at')
+        if (self.arvados_node is not None) or (not first_ping_s):
             return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+              (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
@@ -286,9 +340,17 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
+        # If the cloud node's FQDN doesn't match what's in the Arvados node
+        # record, make them match.
+        # This method is a little unusual in the way it just fires off the
+        # request without checking the result or retrying errors.  That's
+        # because this update happens every time we reload the Arvados node
+        # list: if a previous sync attempt failed, we'll see that the names
+        # are out of sync and just try again.  ComputeNodeUpdateActor has
+        # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
+            if (self._cloud_node_fqdn(self.cloud_node) !=
+                  arvados_node_fqdn(self.arvados_node)):
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()