This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ def __init__(self, logger_name, cloud_client, timer_actor,
+ retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
self._later = self.actor_ref.proxy()
- self._timer = timer_actor
self._logger = logging.getLogger(logger_name)
+ self._cloud = cloud_client
+ self._timer = timer_actor
self.min_retry_wait = retry_wait
self.max_retry_wait = max_retry_wait
self.retry_wait = retry_wait
self.subscribers = set()
@staticmethod
- def _retry(errors):
+ def _retry(errors=()):
"""Retry decorator for an actor method that makes remote requests.
Use this function to decorator an actor method, and pass in a
tuple of exceptions to catch. This decorator will schedule
retries of that method with exponential backoff if the
- original method raises any of the given errors.
+ original method raises a known cloud driver error, or any of the
+ given exception types.
"""
def decorator(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def retry_wrapper(self, *args, **kwargs):
start_time = time.time()
try:
orig_func(self, *args, **kwargs)
- except errors as error:
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ raise
self._logger.warning(
"Client error: %s - waiting %s seconds",
error, self.retry_wait)
self.max_retry_wait)
else:
self.retry_wait = self.min_retry_wait
- return wrapper
+ return retry_wrapper
return decorator
def _finished(self):
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+ 'arvnodeman.nodeup', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
self._arvados = arvados_client
- self._cloud = cloud_client
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def prepare_arvados_node(self, node):
self.arvados_node = self._arvados.nodes().update(
uuid=node['uuid'],
).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
+ self._later.post_create()
+
+ @ComputeNodeStateChangeBase._retry()
+ def post_create(self):
+ self._cloud.post_create_node(self.cloud_node)
+ self._logger.info("%s post-create work done.", self.cloud_node.id)
self._finished()
def stop_if_no_cloud_node(self):
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
+ 'arvnodeman.nodedown', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
self.cancellable = cancellable
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
(not self._monitor.shutdown_eligible().get())):
self._logger.info(
return None
else:
return orig_func(self, *args, **kwargs)
- return wrapper
+ return stop_wrapper
@_stop_if_window_closed
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def shutdown_node(self):
if self._cloud.destroy_node(self.cloud_node):
self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
def _throttle_errors(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def throttle_wrapper(self, *args, **kwargs):
throttle_time = self.next_request_time - time.time()
if throttle_time > 0:
time.sleep(throttle_time)
self.next_request_time = time.time()
try:
result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
+ except Exception as error:
self.error_streak += 1
self.next_request_time += min(2 ** self.error_streak,
self.max_retry_wait)
else:
self.error_streak = 0
return result
- return wrapper
+ return throttle_wrapper
@_throttle_errors
def sync_node(self, cloud_node, arvados_node):