compute drivers.
from __future__ import absolute_import, print_function
import calendar
+import functools
import itertools
import re
import time
else:
return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
+def _retry(errors=()):
+ """Retry decorator for an actor method that makes remote requests.
+
+ Use this function to decorator an actor method, and pass in a
+ tuple of exceptions to catch. This decorator will schedule
+ retries of that method with exponential backoff if the
+ original method raises a known cloud driver error, or any of the
+ given exception types.
+ """
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def retry_wrapper(self, *args, **kwargs):
+ start_time = time.time()
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ raise
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(start_time + self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return retry_wrapper
+ return decorator
+
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.
import pykka
from .. import \
- arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
+ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
+ arvados_node_missing, _retry
from ...clientactor import _notify_subscribers
from ... import config
self.retry_wait = retry_wait
self.subscribers = set()
- @staticmethod
- def _retry(errors=()):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises a known cloud driver error, or any of the
- given exception types.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def retry_wrapper(self, *args, **kwargs):
- start_time = time.time()
- try:
- orig_func(self, *args, **kwargs)
- except Exception as error:
- if not (isinstance(error, errors) or
- self._cloud.is_cloud_exception(error)):
- raise
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(start_time + self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return retry_wrapper
- return decorator
-
def _finished(self):
_notify_subscribers(self._later, self.subscribers)
self.subscribers = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @_retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @_retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._clean_arvados_node(
node, "Prepared by Node Manager")
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @_retry()
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @_retry(config.ARVADOS_ERRORS)
def update_arvados_node_properties(self):
"""Tell Arvados some details about the cloud node.
self._logger.info("%s updated properties.", self.arvados_node['uuid'])
self._later.post_create()
- @ComputeNodeStateChangeBase._retry()
+ @_retry()
def post_create(self):
self._cloud.post_create_node(self.cloud_node)
self._logger.info("%s post-create work done.", self.cloud_node.id)
return stop_wrapper
@_stop_if_window_closed
- @ComputeNodeStateChangeBase._retry()
+ @_retry()
def shutdown_node(self):
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
else:
self._later.clean_arvados_node(arv_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @_retry(config.ARVADOS_ERRORS)
def clean_arvados_node(self, arvados_node):
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
self._finished(success_flag=True)
from . import \
ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
+from .. import _retry
class ComputeNodeShutdownActor(ShutdownActorBase):
SLURM_END_STATES = frozenset(['down\n', 'down*\n',
# of the excessive memory usage that result in the "Cannot allocate memory"
# error are still being investigated.
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @_retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self, reason):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @_retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @_retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
output = self._get_slurm_state()
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import NETWORK_ERRORS
+from .. import _retry
class BaseComputeNodeDriver(object):
"""Abstract base class for compute node drivers.
"""
CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ @_retry()
+ def _create_driver(self, driver_class, **auth_kwargs):
+ return driver_class(**auth_kwargs)
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class, retry_wait=1, max_retry_wait=180):
"""Base initializer for compute node drivers.
Arguments:
libcloud driver's create_node method to create a new compute node.
* driver_class: The class of a libcloud driver to use.
"""
- self.real = driver_class(**auth_kwargs)
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.real = self._create_driver(driver_class, **auth_kwargs)
self.list_kwargs = list_kwargs
self.create_kwargs = create_kwargs
# Transform entries in create_kwargs. For each key K, if this class