def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
-def _retry(errors):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a tuple of
- exceptions to catch. This decorator will schedule retries of that method
- with exponential backoff if the original method raises any of the given
- errors.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- try:
- orig_func(self, *args, **kwargs)
- except errors as error:
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
- getattr(self._later, orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
class BaseComputeNodeDriver(object):
"""Abstract base class for compute node drivers.
ComputeNodeDriverClass = BaseComputeNodeDriver
-class ComputeNodeSetupActor(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class):
+ """Base class for actors that change a compute node's state.
+
+ This base class takes care of retrying changes and notifying
+ subscribers when the change is finished.
+ """
+ def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ super(ComputeNodeStateChangeBase, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._timer = timer_actor
+ self._logger = logging.getLogger(logger_name)
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self.subscribers = set()
+
+ @staticmethod
+ def _retry(errors):
+ """Retry decorator for an actor method that makes remote requests.
+
+ Use this function to decorator an actor method, and pass in a
+ tuple of exceptions to catch. This decorator will schedule
+ retries of that method with exponential backoff if the
+ original method raises any of the given errors.
+ """
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ orig_func(self, *args, **kwargs)
+ except errors as error:
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return wrapper
+ return decorator
+
+ def _finished(self):
+ _notify_subscribers(self._later, self.subscribers)
+ self.subscribers = None
+
+ def subscribe(self, subscriber):
+ if self.subscribers is None:
+ try:
+ subscriber(self._later)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
This actor prepares an Arvados node record for a new compute node
def __init__(self, timer_actor, arvados_client, cloud_client,
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__()
- self._timer = timer_actor
+ super(ComputeNodeSetupActor, self).__init__(
+ 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
self._arvados = arvados_client
self._cloud = cloud_client
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.nodeup')
self.cloud_size = cloud_size
- self.subscribers = set()
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self.arvados_node = None
self.cloud_node = None
if arvados_node is None:
else:
self._later.prepare_arvados_node(arvados_node)
- @_retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @_retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._arvados.nodes().update(
uuid=node['uuid'],
).execute()
self._later.create_cloud_node()
- @_retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
+ self._finished()
def stop_if_no_cloud_node(self):
if self.cloud_node is None:
self.stop()
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-class ComputeNodeShutdownActor(config.actor_class):
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
"""Actor to shut down a compute node.
This actor simply destroys a cloud node, retrying as needed.
"""
def __init__(self, timer_actor, cloud_client, cloud_node,
retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__()
- self._timer = timer_actor
+ super(ComputeNodeShutdownActor, self).__init__(
+ 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.nodedown')
self.cloud_node = cloud_node
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self._later.shutdown_node()
- @_retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def shutdown_node(self):
self._cloud.destroy_node(self.cloud_node)
self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._finished()
class ComputeNodeUpdateActor(config.actor_class):