X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44e01cf266a3c062b2f0f5bb3426672024367d38..8d2639525417aaa02240777454405f2249d505b0:/services/nodemanager/arvnodeman/computenode/__init__.py diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py index ae25428944..0d4ee7b52d 100644 --- a/services/nodemanager/arvnodeman/computenode/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -23,33 +23,6 @@ def arvados_node_mtime(node): def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time -def _retry(errors): - """Retry decorator for an actor method that makes remote requests. - - Use this function to decorator an actor method, and pass in a tuple of - exceptions to catch. This decorator will schedule retries of that method - with exponential backoff if the original method raises any of the given - errors. - """ - def decorator(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - try: - orig_func(self, *args, **kwargs) - except errors as error: - self._logger.warning( - "Client error: %s - waiting %s seconds", - error, self.retry_wait) - self._timer.schedule(self.retry_wait, - getattr(self._later, orig_func.__name__), - *args, **kwargs) - self.retry_wait = min(self.retry_wait * 2, - self.max_retry_wait) - else: - self.retry_wait = self.min_retry_wait - return wrapper - return decorator - class BaseComputeNodeDriver(object): """Abstract base class for compute node drivers. @@ -114,7 +87,66 @@ class BaseComputeNodeDriver(object): ComputeNodeDriverClass = BaseComputeNodeDriver -class ComputeNodeSetupActor(config.actor_class): +class ComputeNodeStateChangeBase(config.actor_class): + """Base class for actors that change a compute node's state. + + This base class takes care of retrying changes and notifying + subscribers when the change is finished. + """ + def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait): + super(ComputeNodeStateChangeBase, self).__init__() + self._later = self.actor_ref.proxy() + self._timer = timer_actor + self._logger = logging.getLogger(logger_name) + self.min_retry_wait = retry_wait + self.max_retry_wait = max_retry_wait + self.retry_wait = retry_wait + self.subscribers = set() + + @staticmethod + def _retry(errors): + """Retry decorator for an actor method that makes remote requests. + + Use this function to decorator an actor method, and pass in a + tuple of exceptions to catch. This decorator will schedule + retries of that method with exponential backoff if the + original method raises any of the given errors. + """ + def decorator(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + try: + orig_func(self, *args, **kwargs) + except errors as error: + self._logger.warning( + "Client error: %s - waiting %s seconds", + error, self.retry_wait) + self._timer.schedule(self.retry_wait, + getattr(self._later, + orig_func.__name__), + *args, **kwargs) + self.retry_wait = min(self.retry_wait * 2, + self.max_retry_wait) + else: + self.retry_wait = self.min_retry_wait + return wrapper + return decorator + + def _finished(self): + _notify_subscribers(self._later, self.subscribers) + self.subscribers = None + + def subscribe(self, subscriber): + if self.subscribers is None: + try: + subscriber(self._later) + except pykka.ActorDeadError: + pass + else: + self.subscribers.add(subscriber) + + +class ComputeNodeSetupActor(ComputeNodeStateChangeBase): """Actor to create and set up a cloud compute node. This actor prepares an Arvados node record for a new compute node @@ -126,17 +158,11 @@ class ComputeNodeSetupActor(config.actor_class): def __init__(self, timer_actor, arvados_client, cloud_client, cloud_size, arvados_node=None, retry_wait=1, max_retry_wait=180): - super(ComputeNodeSetupActor, self).__init__() - self._timer = timer_actor + super(ComputeNodeSetupActor, self).__init__( + 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait) self._arvados = arvados_client self._cloud = cloud_client - self._later = self.actor_ref.proxy() - self._logger = logging.getLogger('arvnodeman.nodeup') self.cloud_size = cloud_size - self.subscribers = set() - self.min_retry_wait = retry_wait - self.max_retry_wait = max_retry_wait - self.retry_wait = retry_wait self.arvados_node = None self.cloud_node = None if arvados_node is None: @@ -144,12 +170,12 @@ class ComputeNodeSetupActor(config.actor_class): else: self._later.prepare_arvados_node(arvados_node) - @_retry(config.ARVADOS_ERRORS) + @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) def create_arvados_node(self): self.arvados_node = self._arvados.nodes().create(body={}).execute() self._later.create_cloud_node() - @_retry(config.ARVADOS_ERRORS) + @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) def prepare_arvados_node(self, node): self.arvados_node = self._arvados.nodes().update( uuid=node['uuid'], @@ -163,52 +189,38 @@ class ComputeNodeSetupActor(config.actor_class): ).execute() self._later.create_cloud_node() - @_retry(config.CLOUD_ERRORS) + @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) def create_cloud_node(self): self._logger.info("Creating cloud node with size %s.", self.cloud_size.name) self.cloud_node = self._cloud.create_node(self.cloud_size, self.arvados_node) self._logger.info("Cloud node %s created.", self.cloud_node.id) - _notify_subscribers(self._later, self.subscribers) - self.subscribers = None + self._finished() def stop_if_no_cloud_node(self): if self.cloud_node is None: self.stop() - def subscribe(self, subscriber): - if self.subscribers is None: - try: - subscriber(self._later) - except pykka.ActorDeadError: - pass - else: - self.subscribers.add(subscriber) - -class ComputeNodeShutdownActor(config.actor_class): +class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): """Actor to shut down a compute node. This actor simply destroys a cloud node, retrying as needed. """ def __init__(self, timer_actor, cloud_client, cloud_node, retry_wait=1, max_retry_wait=180): - super(ComputeNodeShutdownActor, self).__init__() - self._timer = timer_actor + super(ComputeNodeShutdownActor, self).__init__( + 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait) self._cloud = cloud_client - self._later = self.actor_ref.proxy() - self._logger = logging.getLogger('arvnodeman.nodedown') self.cloud_node = cloud_node - self.min_retry_wait = retry_wait - self.max_retry_wait = max_retry_wait - self.retry_wait = retry_wait self._later.shutdown_node() - @_retry(config.CLOUD_ERRORS) + @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) def shutdown_node(self): self._cloud.destroy_node(self.cloud_node) self._logger.info("Cloud node %s shut down.", self.cloud_node.id) + self._finished() class ComputeNodeUpdateActor(config.actor_class):