3410: Merge branch 'master' into 3410-replication-attrs
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index ae0a65b9317e86f8d53afa00ba7b9483f047c4aa..1608b529fb848e00ac8968ef8a04a427e6f4e0a7 100644 (file)
@@ -19,32 +19,38 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
                 start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
@@ -56,7 +62,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
@@ -86,9 +92,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -97,12 +103,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -116,13 +122,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -136,12 +148,18 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, node_monitor,
-                 retry_wait=1, max_retry_wait=180):
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
         self.success = None
 
     def on_start(self):
@@ -153,8 +171,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            if not self._monitor.shutdown_eligible().get():
+        def stop_wrapper(self, *args, **kwargs):
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
                     "Cloud node %s shutdown cancelled - no longer eligible.",
                     self.cloud_node.id)
@@ -162,10 +181,10 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
-        return wrapper
+        return stop_wrapper
 
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
         if self._cloud.destroy_node(self.cloud_node):
             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
@@ -203,14 +222,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -218,7 +237,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
@@ -269,7 +288,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         if (self.arvados_node is None) or not timestamp_fresh(
               arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return None
-        state = self.arvados_node['info'].get('slurm_state')
+        state = self.arvados_node['crunch_worker_state']
         if not state:
             return None
         result = state in states