X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fe5ee2e37364330b4a58c0c41e8a0b627e8cb1f2..c4fa80c6ed2445e1e384455944eb6c4108906cad:/services/nodemanager/arvnodeman/computenode/__init__.py diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py index 0d4ee7b52d..f5186074c6 100644 --- a/services/nodemanager/arvnodeman/computenode/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -2,273 +2,34 @@ from __future__ import absolute_import, print_function -import functools +import calendar import itertools -import logging +import re import time -import pykka - -from ..clientactor import _notify_subscribers -from .. import config +ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ' +ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$') def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'): hostname = arvados_node.get('hostname') or default_hostname return '{}.{}'.format(hostname, arvados_node['domain']) def arvados_node_mtime(node): - return time.mktime(time.strptime(node['modified_at'] + 'UTC', - '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone + return arvados_timestamp(node['modified_at']) + +def arvados_timestamp(timestr): + subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr) + if subsec_match is None: + subsecs = .0 + else: + subsecs = float(subsec_match.group(1)) + timestr = timestr[:subsec_match.start()] + 'Z' + return calendar.timegm(time.strptime(timestr + 'UTC', + ARVADOS_TIMEFMT + '%Z')) def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time -class BaseComputeNodeDriver(object): - """Abstract base class for compute node drivers. - - libcloud abstracts away many of the differences between cloud providers, - but managing compute nodes requires some cloud-specific features (e.g., - on EC2 we use tags to identify compute nodes). Compute node drivers - are responsible for translating the node manager's cloud requests to a - specific cloud's vocabulary. - - Subclasses must implement arvados_create_kwargs (to update node - creation kwargs with information about the specific Arvados node - record), sync_node, and node_start_time. - """ - def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class): - self.real = driver_class(**auth_kwargs) - self.list_kwargs = list_kwargs - self.create_kwargs = create_kwargs - - def __getattr__(self, name): - # Proxy non-extension methods to the real driver. - if (not name.startswith('_') and not name.startswith('ex_') - and hasattr(self.real, name)): - return getattr(self.real, name) - else: - return super(BaseComputeNodeDriver, self).__getattr__(name) - - def search_for(self, term, list_method, key=lambda item: item.id): - cache_key = (list_method, term) - if cache_key not in self.SEARCH_CACHE: - results = [item for item in getattr(self.real, list_method)() - if key(item) == term] - count = len(results) - if count != 1: - raise ValueError("{} returned {} results for '{}'".format( - list_method, count, term)) - self.SEARCH_CACHE[cache_key] = results[0] - return self.SEARCH_CACHE[cache_key] - - def list_nodes(self): - return self.real.list_nodes(**self.list_kwargs) - - def arvados_create_kwargs(self, arvados_node): - raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs") - - def create_node(self, size, arvados_node): - kwargs = self.create_kwargs.copy() - kwargs.update(self.arvados_create_kwargs(arvados_node)) - kwargs['size'] = size - return self.real.create_node(**kwargs) - - def sync_node(self, cloud_node, arvados_node): - # When a compute node first pings the API server, the API server - # will automatically assign some attributes on the corresponding - # node record, like hostname. This method should propagate that - # information back to the cloud node appropriately. - raise NotImplementedError("BaseComputeNodeDriver.sync_node") - - @classmethod - def node_start_time(cls, node): - raise NotImplementedError("BaseComputeNodeDriver.node_start_time") - - -ComputeNodeDriverClass = BaseComputeNodeDriver - -class ComputeNodeStateChangeBase(config.actor_class): - """Base class for actors that change a compute node's state. - - This base class takes care of retrying changes and notifying - subscribers when the change is finished. - """ - def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait): - super(ComputeNodeStateChangeBase, self).__init__() - self._later = self.actor_ref.proxy() - self._timer = timer_actor - self._logger = logging.getLogger(logger_name) - self.min_retry_wait = retry_wait - self.max_retry_wait = max_retry_wait - self.retry_wait = retry_wait - self.subscribers = set() - - @staticmethod - def _retry(errors): - """Retry decorator for an actor method that makes remote requests. - - Use this function to decorator an actor method, and pass in a - tuple of exceptions to catch. This decorator will schedule - retries of that method with exponential backoff if the - original method raises any of the given errors. - """ - def decorator(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - try: - orig_func(self, *args, **kwargs) - except errors as error: - self._logger.warning( - "Client error: %s - waiting %s seconds", - error, self.retry_wait) - self._timer.schedule(self.retry_wait, - getattr(self._later, - orig_func.__name__), - *args, **kwargs) - self.retry_wait = min(self.retry_wait * 2, - self.max_retry_wait) - else: - self.retry_wait = self.min_retry_wait - return wrapper - return decorator - - def _finished(self): - _notify_subscribers(self._later, self.subscribers) - self.subscribers = None - - def subscribe(self, subscriber): - if self.subscribers is None: - try: - subscriber(self._later) - except pykka.ActorDeadError: - pass - else: - self.subscribers.add(subscriber) - - -class ComputeNodeSetupActor(ComputeNodeStateChangeBase): - """Actor to create and set up a cloud compute node. - - This actor prepares an Arvados node record for a new compute node - (either creating one or cleaning one passed in), then boots the - actual compute node. It notifies subscribers when the cloud node - is successfully created (the last step in the process for Node - Manager to handle). - """ - def __init__(self, timer_actor, arvados_client, cloud_client, - cloud_size, arvados_node=None, - retry_wait=1, max_retry_wait=180): - super(ComputeNodeSetupActor, self).__init__( - 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait) - self._arvados = arvados_client - self._cloud = cloud_client - self.cloud_size = cloud_size - self.arvados_node = None - self.cloud_node = None - if arvados_node is None: - self._later.create_arvados_node() - else: - self._later.prepare_arvados_node(arvados_node) - - @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) - def create_arvados_node(self): - self.arvados_node = self._arvados.nodes().create(body={}).execute() - self._later.create_cloud_node() - - @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) - def prepare_arvados_node(self, node): - self.arvados_node = self._arvados.nodes().update( - uuid=node['uuid'], - body={'hostname': None, - 'ip_address': None, - 'slot_number': None, - 'first_ping_at': None, - 'last_ping_at': None, - 'info': {'ec2_instance_id': None, - 'last_action': "Prepared by Node Manager"}} - ).execute() - self._later.create_cloud_node() - - @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) - def create_cloud_node(self): - self._logger.info("Creating cloud node with size %s.", - self.cloud_size.name) - self.cloud_node = self._cloud.create_node(self.cloud_size, - self.arvados_node) - self._logger.info("Cloud node %s created.", self.cloud_node.id) - self._finished() - - def stop_if_no_cloud_node(self): - if self.cloud_node is None: - self.stop() - - -class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): - """Actor to shut down a compute node. - - This actor simply destroys a cloud node, retrying as needed. - """ - def __init__(self, timer_actor, cloud_client, cloud_node, - retry_wait=1, max_retry_wait=180): - super(ComputeNodeShutdownActor, self).__init__( - 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait) - self._cloud = cloud_client - self.cloud_node = cloud_node - self._later.shutdown_node() - - @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) - def shutdown_node(self): - self._cloud.destroy_node(self.cloud_node) - self._logger.info("Cloud node %s shut down.", self.cloud_node.id) - self._finished() - - -class ComputeNodeUpdateActor(config.actor_class): - """Actor to dispatch one-off cloud management requests. - - This actor receives requests for small cloud updates, and - dispatches them to a real driver. ComputeNodeMonitorActors use - this to perform maintenance tasks on themselves. Having a - dedicated actor for this gives us the opportunity to control the - flow of requests; e.g., by backing off when errors occur. - - This actor is most like a "traditional" Pykka actor: there's no - subscribing, but instead methods return real driver results. If - you're interested in those results, you should get them from the - Future that the proxy method returns. Be prepared to handle exceptions - from the cloud driver when you do. - """ - def __init__(self, cloud_factory, max_retry_wait=180): - super(ComputeNodeUpdateActor, self).__init__() - self._cloud = cloud_factory() - self.max_retry_wait = max_retry_wait - self.error_streak = 0 - self.next_request_time = time.time() - - def _throttle_errors(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - throttle_time = self.next_request_time - time.time() - if throttle_time > 0: - time.sleep(throttle_time) - self.next_request_time = time.time() - try: - result = orig_func(self, *args, **kwargs) - except config.CLOUD_ERRORS: - self.error_streak += 1 - self.next_request_time += min(2 ** self.error_streak, - self.max_retry_wait) - raise - else: - self.error_streak = 0 - return result - return wrapper - - @_throttle_errors - def sync_node(self, cloud_node, arvados_node): - return self._cloud.sync_node(cloud_node, arvados_node) - - class ShutdownTimer(object): """Keep track of a cloud node's shutdown windows. @@ -309,87 +70,3 @@ class ShutdownTimer(object): def window_open(self): self._advance_opening() return 0 < (time.time() - self._open_start) < self._open_for - - -class ComputeNodeMonitorActor(config.actor_class): - """Actor to manage a running compute node. - - This actor gets updates about a compute node's cloud and Arvados records. - It uses this information to notify subscribers when the node is eligible - for shutdown. - """ - def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer, - timer_actor, update_actor, arvados_node=None, - poll_stale_after=600, node_stale_after=3600): - super(ComputeNodeMonitorActor, self).__init__() - self._later = self.actor_ref.proxy() - self._logger = logging.getLogger('arvnodeman.computenode') - self._last_log = None - self._shutdowns = shutdown_timer - self._timer = timer_actor - self._update = update_actor - self.cloud_node = cloud_node - self.cloud_node_start_time = cloud_node_start_time - self.poll_stale_after = poll_stale_after - self.node_stale_after = node_stale_after - self.subscribers = set() - self.arvados_node = None - self._later.update_arvados_node(arvados_node) - self.last_shutdown_opening = None - self._later.consider_shutdown() - - def subscribe(self, subscriber): - self.subscribers.add(subscriber) - - def _debug(self, msg, *args): - if msg == self._last_log: - return - self._last_log = msg - self._logger.debug(msg, *args) - - def _shutdown_eligible(self): - if self.arvados_node is None: - return timestamp_fresh(self.cloud_node_start_time, - self.node_stale_after) - else: - return (timestamp_fresh(arvados_node_mtime(self.arvados_node), - self.poll_stale_after) and - (self.arvados_node['info'].get('slurm_state') == 'idle')) - - def consider_shutdown(self): - next_opening = self._shutdowns.next_opening() - if self._shutdowns.window_open(): - if self._shutdown_eligible(): - self._debug("Node %s suggesting shutdown.", self.cloud_node.id) - _notify_subscribers(self._later, self.subscribers) - else: - self._debug("Node %s shutdown window open but node busy.", - self.cloud_node.id) - else: - self._debug("Node %s shutdown window closed. Next at %s.", - self.cloud_node.id, time.ctime(next_opening)) - if self.last_shutdown_opening != next_opening: - self._timer.schedule(next_opening, self._later.consider_shutdown) - self.last_shutdown_opening = next_opening - - def offer_arvados_pair(self, arvados_node): - if self.arvados_node is not None: - return None - elif arvados_node['ip_address'] in self.cloud_node.private_ips: - self._later.update_arvados_node(arvados_node) - return self.cloud_node.id - else: - return None - - def update_cloud_node(self, cloud_node): - if cloud_node is not None: - self.cloud_node = cloud_node - self._later.consider_shutdown() - - def update_arvados_node(self, arvados_node): - if arvados_node is not None: - self.arvados_node = arvados_node - new_hostname = arvados_node_fqdn(self.arvados_node) - if new_hostname != self.cloud_node.name: - self._update.sync_node(self.cloud_node, self.arvados_node) - self._later.consider_shutdown()