-class BaseComputeNodeDriver(object):
- """Abstract base class for compute node drivers.
-
- libcloud abstracts away many of the differences between cloud providers,
- but managing compute nodes requires some cloud-specific features (e.g.,
- on EC2 we use tags to identify compute nodes). Compute node drivers
- are responsible for translating the node manager's cloud requests to a
- specific cloud's vocabulary.
-
- Subclasses must implement arvados_create_kwargs (to update node
- creation kwargs with information about the specific Arvados node
- record), sync_node, and node_start_time.
- """
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
- self.real = driver_class(**auth_kwargs)
- self.list_kwargs = list_kwargs
- self.create_kwargs = create_kwargs
-
- def __getattr__(self, name):
- # Proxy non-extension methods to the real driver.
- if (not name.startswith('_') and not name.startswith('ex_')
- and hasattr(self.real, name)):
- return getattr(self.real, name)
- else:
- return super(BaseComputeNodeDriver, self).__getattr__(name)
-
- def search_for(self, term, list_method, key=lambda item: item.id):
- cache_key = (list_method, term)
- if cache_key not in self.SEARCH_CACHE:
- results = [item for item in getattr(self.real, list_method)()
- if key(item) == term]
- count = len(results)
- if count != 1:
- raise ValueError("{} returned {} results for '{}'".format(
- list_method, count, term))
- self.SEARCH_CACHE[cache_key] = results[0]
- return self.SEARCH_CACHE[cache_key]
-
- def list_nodes(self):
- return self.real.list_nodes(**self.list_kwargs)
-
- def arvados_create_kwargs(self, arvados_node):
- raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
- def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
-
- def sync_node(self, cloud_node, arvados_node):
- # When a compute node first pings the API server, the API server
- # will automatically assign some attributes on the corresponding
- # node record, like hostname. This method should propagate that
- # information back to the cloud node appropriately.
- raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
- @classmethod
- def node_start_time(cls, node):
- raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
- """Base class for actors that change a compute node's state.
-
- This base class takes care of retrying changes and notifying
- subscribers when the change is finished.
- """
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
- super(ComputeNodeStateChangeBase, self).__init__()
- self._later = self.actor_ref.proxy()
- self._timer = timer_actor
- self._logger = logging.getLogger(logger_name)
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
- self.subscribers = set()
-
- @staticmethod
- def _retry(errors):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises any of the given errors.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- try:
- orig_func(self, *args, **kwargs)
- except errors as error:
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
- def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
-
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
- """Actor to create and set up a cloud compute node.
-
- This actor prepares an Arvados node record for a new compute node
- (either creating one or cleaning one passed in), then boots the
- actual compute node. It notifies subscribers when the cloud node
- is successfully created (the last step in the process for Node
- Manager to handle).
- """
- def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node=None,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
- self._arvados = arvados_client
- self._cloud = cloud_client
- self.cloud_size = cloud_size
- self.arvados_node = None
- self.cloud_node = None
- if arvados_node is None:
- self._later.create_arvados_node()
- else:
- self._later.prepare_arvados_node(arvados_node)
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
- self.cloud_size.name)
- self.cloud_node = self._cloud.create_node(self.cloud_size,
- self.arvados_node)
- self._logger.info("Cloud node %s created.", self.cloud_node.id)
- self._finished()
-
- def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
- """Actor to shut down a compute node.
-
- This actor simply destroys a cloud node, retrying as needed.
- """
- def __init__(self, timer_actor, cloud_client, cloud_node,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
- self.cloud_node = cloud_node
- self._later.shutdown_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
- """Actor to dispatch one-off cloud management requests.
-
- This actor receives requests for small cloud updates, and
- dispatches them to a real driver. ComputeNodeMonitorActors use
- this to perform maintenance tasks on themselves. Having a
- dedicated actor for this gives us the opportunity to control the
- flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
- """
- def __init__(self, cloud_factory, max_retry_wait=180):
- super(ComputeNodeUpdateActor, self).__init__()
- self._cloud = cloud_factory()
- self.max_retry_wait = max_retry_wait
- self.error_streak = 0
- self.next_request_time = time.time()
-
- def _throttle_errors(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- throttle_time = self.next_request_time - time.time()
- if throttle_time > 0:
- time.sleep(throttle_time)
- self.next_request_time = time.time()
- try:
- result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- raise
- else:
- self.error_streak = 0
- return result
- return wrapper
-
- @_throttle_errors
- def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
-
-