4139: Node Manager more closely tracks nodes it boots.
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
index ae25428944dd8a967de0f60b6462197acfff4fb2..0d4ee7b52d4991b3d2d925947d9a616ccf35383c 100644 (file)
@@ -23,33 +23,6 @@ def arvados_node_mtime(node):
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-def _retry(errors):
-    """Retry decorator for an actor method that makes remote requests.
-
-    Use this function to decorator an actor method, and pass in a tuple of
-    exceptions to catch.  This decorator will schedule retries of that method
-    with exponential backoff if the original method raises any of the given
-    errors.
-    """
-    def decorator(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            try:
-                orig_func(self, *args, **kwargs)
-            except errors as error:
-                self._logger.warning(
-                    "Client error: %s - waiting %s seconds",
-                    error, self.retry_wait)
-                self._timer.schedule(self.retry_wait,
-                                     getattr(self._later, orig_func.__name__),
-                                     *args, **kwargs)
-                self.retry_wait = min(self.retry_wait * 2,
-                                      self.max_retry_wait)
-            else:
-                self.retry_wait = self.min_retry_wait
-        return wrapper
-    return decorator
-
 class BaseComputeNodeDriver(object):
     """Abstract base class for compute node drivers.
 
@@ -114,7 +87,66 @@ class BaseComputeNodeDriver(object):
 
 ComputeNodeDriverClass = BaseComputeNodeDriver
 
-class ComputeNodeSetupActor(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class):
+    """Base class for actors that change a compute node's state.
+
+    This base class takes care of retrying changes and notifying
+    subscribers when the change is finished.
+    """
+    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+        super(ComputeNodeStateChangeBase, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._timer = timer_actor
+        self._logger = logging.getLogger(logger_name)
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.subscribers = set()
+
+    @staticmethod
+    def _retry(errors):
+        """Retry decorator for an actor method that makes remote requests.
+
+        Use this function to decorator an actor method, and pass in a
+        tuple of exceptions to catch.  This decorator will schedule
+        retries of that method with exponential backoff if the
+        original method raises any of the given errors.
+        """
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def wrapper(self, *args, **kwargs):
+                try:
+                    orig_func(self, *args, **kwargs)
+                except errors as error:
+                    self._logger.warning(
+                        "Client error: %s - waiting %s seconds",
+                        error, self.retry_wait)
+                    self._timer.schedule(self.retry_wait,
+                                         getattr(self._later,
+                                                 orig_func.__name__),
+                                         *args, **kwargs)
+                    self.retry_wait = min(self.retry_wait * 2,
+                                          self.max_retry_wait)
+                else:
+                    self.retry_wait = self.min_retry_wait
+            return wrapper
+        return decorator
+
+    def _finished(self):
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
     This actor prepares an Arvados node record for a new compute node
@@ -126,17 +158,11 @@ class ComputeNodeSetupActor(config.actor_class):
     def __init__(self, timer_actor, arvados_client, cloud_client,
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__()
-        self._timer = timer_actor
+        super(ComputeNodeSetupActor, self).__init__(
+            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
         self._arvados = arvados_client
         self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodeup')
         self.cloud_size = cloud_size
-        self.subscribers = set()
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self.arvados_node = None
         self.cloud_node = None
         if arvados_node is None:
@@ -144,12 +170,12 @@ class ComputeNodeSetupActor(config.actor_class):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @_retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @_retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -163,52 +189,38 @@ class ComputeNodeSetupActor(config.actor_class):
             ).execute()
         self._later.create_cloud_node()
 
-    @_retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
+        self._finished()
 
     def stop_if_no_cloud_node(self):
         if self.cloud_node is None:
             self.stop()
 
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
 
-class ComputeNodeShutdownActor(config.actor_class):
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     """Actor to shut down a compute node.
 
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, cloud_node,
                  retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__()
-        self._timer = timer_actor
+        super(ComputeNodeShutdownActor, self).__init__(
+            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
         self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodedown')
         self.cloud_node = cloud_node
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self._later.shutdown_node()
 
-    @_retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
     def shutdown_node(self):
         self._cloud.destroy_node(self.cloud_node)
         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._finished()
 
 
 class ComputeNodeUpdateActor(config.actor_class):