Merge branch 'master' of git.curoverse.com:arvados into 3408-production-datamanager
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index ae0a65b9317e86f8d53afa00ba7b9483f047c4aa..48e8dcf45888de232ba9004c290023e6bf5999b7 100644 (file)
@@ -19,32 +19,38 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
                 start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
                 start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
@@ -56,7 +62,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
         return decorator
 
     def _finished(self):
@@ -86,9 +92,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -97,12 +103,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -116,13 +122,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -136,12 +148,18 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, node_monitor,
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, node_monitor,
-                 retry_wait=1, max_retry_wait=180):
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
         self.success = None
 
     def on_start(self):
         self.success = None
 
     def on_start(self):
@@ -153,8 +171,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            if not self._monitor.shutdown_eligible().get():
+        def stop_wrapper(self, *args, **kwargs):
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
                     "Cloud node %s shutdown cancelled - no longer eligible.",
                     self.cloud_node.id)
                 self._logger.info(
                     "Cloud node %s shutdown cancelled - no longer eligible.",
                     self.cloud_node.id)
@@ -162,10 +181,10 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
-        return wrapper
+        return stop_wrapper
 
     @_stop_if_window_closed
 
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
         if self._cloud.destroy_node(self.cloud_node):
             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
     def shutdown_node(self):
         if self._cloud.destroy_node(self.cloud_node):
             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
@@ -203,14 +222,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -218,7 +237,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):