#!/usr/bin/env python from __future__ import absolute_import, print_function import functools import logging import time import libcloud.common.types as cloud_types import pykka from .. import \ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing from ...clientactor import _notify_subscribers from ... import config class ComputeNodeStateChangeBase(config.actor_class): """Base class for actors that change a compute node's state. This base class takes care of retrying changes and notifying subscribers when the change is finished. """ def __init__(self, logger_name, cloud_client, arvados_client, timer_actor, retry_wait, max_retry_wait): super(ComputeNodeStateChangeBase, self).__init__() self._later = self.actor_ref.proxy() self._logger = logging.getLogger(logger_name) self._cloud = cloud_client self._arvados = arvados_client self._timer = timer_actor self.min_retry_wait = retry_wait self.max_retry_wait = max_retry_wait self.retry_wait = retry_wait self.subscribers = set() @staticmethod def _retry(errors=()): """Retry decorator for an actor method that makes remote requests. Use this function to decorator an actor method, and pass in a tuple of exceptions to catch. This decorator will schedule retries of that method with exponential backoff if the original method raises a known cloud driver error, or any of the given exception types. """ def decorator(orig_func): @functools.wraps(orig_func) def retry_wrapper(self, *args, **kwargs): start_time = time.time() try: orig_func(self, *args, **kwargs) except Exception as error: if not (isinstance(error, errors) or self._cloud.is_cloud_exception(error)): raise self._logger.warning( "Client error: %s - waiting %s seconds", error, self.retry_wait) self._timer.schedule(start_time + self.retry_wait, getattr(self._later, orig_func.__name__), *args, **kwargs) self.retry_wait = min(self.retry_wait * 2, self.max_retry_wait) else: self.retry_wait = self.min_retry_wait return retry_wrapper return decorator def _finished(self): _notify_subscribers(self._later, self.subscribers) self.subscribers = None def subscribe(self, subscriber): if self.subscribers is None: try: subscriber(self._later) except pykka.ActorDeadError: pass else: self.subscribers.add(subscriber) def _clean_arvados_node(self, arvados_node, explanation): return self._arvados.nodes().update( uuid=arvados_node['uuid'], body={'hostname': None, 'ip_address': None, 'slot_number': None, 'first_ping_at': None, 'last_ping_at': None, 'info': {'ec2_instance_id': None, 'last_action': explanation}}, ).execute() class ComputeNodeSetupActor(ComputeNodeStateChangeBase): """Actor to create and set up a cloud compute node. This actor prepares an Arvados node record for a new compute node (either creating one or cleaning one passed in), then boots the actual compute node. It notifies subscribers when the cloud node is successfully created (the last step in the process for Node Manager to handle). """ def __init__(self, timer_actor, arvados_client, cloud_client, cloud_size, arvados_node=None, retry_wait=1, max_retry_wait=180): super(ComputeNodeSetupActor, self).__init__( 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor, retry_wait, max_retry_wait) self.cloud_size = cloud_size self.arvados_node = None self.cloud_node = None if arvados_node is None: self._later.create_arvados_node() else: self._later.prepare_arvados_node(arvados_node) @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) def create_arvados_node(self): self.arvados_node = self._arvados.nodes().create(body={}).execute() self._later.create_cloud_node() @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) def prepare_arvados_node(self, node): self.arvados_node = self._clean_arvados_node( node, "Prepared by Node Manager") self._later.create_cloud_node() @ComputeNodeStateChangeBase._retry() def create_cloud_node(self): self._logger.info("Creating cloud node with size %s.", self.cloud_size.name) self.cloud_node = self._cloud.create_node(self.cloud_size, self.arvados_node) self._logger.info("Cloud node %s created.", self.cloud_node.id) self._later.post_create() @ComputeNodeStateChangeBase._retry() def post_create(self): self._cloud.post_create_node(self.cloud_node) self._logger.info("%s post-create work done.", self.cloud_node.id) self._finished() def stop_if_no_cloud_node(self): if self.cloud_node is not None: return False self.stop() return True class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): """Actor to shut down a compute node. This actor simply destroys a cloud node, retrying as needed. """ def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor, cancellable=True, retry_wait=1, max_retry_wait=180): # If a ShutdownActor is cancellable, it will ask the # ComputeNodeMonitorActor if it's still eligible before taking each # action, and stop the shutdown process if the node is no longer # eligible. Normal shutdowns based on job demand should be # cancellable; shutdowns based on node misbehavior should not. super(ComputeNodeShutdownActor, self).__init__( 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor, retry_wait, max_retry_wait) self._monitor = node_monitor.proxy() self.cloud_node = self._monitor.cloud_node.get() self.cancellable = cancellable self.success = None def on_start(self): self._later.shutdown_node() def _arvados_node(self): return self._monitor.arvados_node.get() def _finished(self, success_flag=None): if success_flag is not None: self.success = success_flag return super(ComputeNodeShutdownActor, self)._finished() def cancel_shutdown(self): self._finished(success_flag=False) def _stop_if_window_closed(orig_func): @functools.wraps(orig_func) def stop_wrapper(self, *args, **kwargs): if (self.cancellable and (not self._monitor.shutdown_eligible().get())): self._logger.info( "Cloud node %s shutdown cancelled - no longer eligible.", self.cloud_node.id) self._later.cancel_shutdown() return None else: return orig_func(self, *args, **kwargs) return stop_wrapper @_stop_if_window_closed @ComputeNodeStateChangeBase._retry() def shutdown_node(self): if not self._cloud.destroy_node(self.cloud_node): # Force a retry. raise cloud_types.LibcloudError("destroy_node failed") self._logger.info("Cloud node %s shut down.", self.cloud_node.id) arv_node = self._arvados_node() if arv_node is None: self._finished(success_flag=True) else: self._later.clean_arvados_node(arv_node) @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) def clean_arvados_node(self, arvados_node): self._clean_arvados_node(arvados_node, "Shut down by Node Manager") self._finished(success_flag=True) # Make the decorator available to subclasses. _stop_if_window_closed = staticmethod(_stop_if_window_closed) class ComputeNodeUpdateActor(config.actor_class): """Actor to dispatch one-off cloud management requests. This actor receives requests for small cloud updates, and dispatches them to a real driver. ComputeNodeMonitorActors use this to perform maintenance tasks on themselves. Having a dedicated actor for this gives us the opportunity to control the flow of requests; e.g., by backing off when errors occur. This actor is most like a "traditional" Pykka actor: there's no subscribing, but instead methods return real driver results. If you're interested in those results, you should get them from the Future that the proxy method returns. Be prepared to handle exceptions from the cloud driver when you do. """ def __init__(self, cloud_factory, max_retry_wait=180): super(ComputeNodeUpdateActor, self).__init__() self._cloud = cloud_factory() self.max_retry_wait = max_retry_wait self.error_streak = 0 self.next_request_time = time.time() def _throttle_errors(orig_func): @functools.wraps(orig_func) def throttle_wrapper(self, *args, **kwargs): throttle_time = self.next_request_time - time.time() if throttle_time > 0: time.sleep(throttle_time) self.next_request_time = time.time() try: result = orig_func(self, *args, **kwargs) except Exception as error: self.error_streak += 1 self.next_request_time += min(2 ** self.error_streak, self.max_retry_wait) raise else: self.error_streak = 0 return result return throttle_wrapper @_throttle_errors def sync_node(self, cloud_node, arvados_node): return self._cloud.sync_node(cloud_node, arvados_node) class ComputeNodeMonitorActor(config.actor_class): """Actor to manage a running compute node. This actor gets updates about a compute node's cloud and Arvados records. It uses this information to notify subscribers when the node is eligible for shutdown. """ def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer, cloud_fqdn_func, timer_actor, update_actor, cloud_client, arvados_node=None, poll_stale_after=600, node_stale_after=3600, boot_fail_after=1800 ): super(ComputeNodeMonitorActor, self).__init__() self._later = self.actor_ref.proxy() self._logger = logging.getLogger('arvnodeman.computenode') self._last_log = None self._shutdowns = shutdown_timer self._cloud_node_fqdn = cloud_fqdn_func self._timer = timer_actor self._update = update_actor self._cloud = cloud_client self.cloud_node = cloud_node self.cloud_node_start_time = cloud_node_start_time self.poll_stale_after = poll_stale_after self.node_stale_after = node_stale_after self.boot_fail_after = boot_fail_after self.subscribers = set() self.arvados_node = None self._later.update_arvados_node(arvados_node) self.last_shutdown_opening = None self._later.consider_shutdown() def subscribe(self, subscriber): self.subscribers.add(subscriber) def _debug(self, msg, *args): if msg == self._last_log: return self._last_log = msg self._logger.debug(msg, *args) def in_state(self, *states): # Return a boolean to say whether or not our Arvados node record is in # one of the given states. If state information is not # available--because this node has no Arvados record, the record is # stale, or the record has no state information--return None. if (self.arvados_node is None) or not timestamp_fresh( arvados_node_mtime(self.arvados_node), self.node_stale_after): return None state = self.arvados_node['crunch_worker_state'] if not state: return None result = state in states if state == 'idle': result = result and not self.arvados_node['job_uuid'] return result def shutdown_eligible(self): if not self._shutdowns.window_open(): return False if self.arvados_node is None: # Node is unpaired. # If it hasn't pinged Arvados after boot_fail seconds, shut it down return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after) missing = arvados_node_missing(self.arvados_node, self.node_stale_after) if missing and self._cloud.broken(self.cloud_node): # Node is paired, but Arvados says it is missing and the cloud says the node # is in an error state, so shut it down. return True if missing is None and self._cloud.broken(self.cloud_node): self._logger.warning( "cloud reports broken node, but paired node %s never pinged " "(bug?) -- skipped check for node_stale_after", self.arvados_node['uuid']) return self.in_state('idle') def consider_shutdown(self): next_opening = self._shutdowns.next_opening() if self.shutdown_eligible(): self._debug("Node %s suggesting shutdown.", self.cloud_node.id) _notify_subscribers(self._later, self.subscribers) elif self._shutdowns.window_open(): self._debug("Node %s shutdown window open but node busy.", self.cloud_node.id) elif self.last_shutdown_opening != next_opening: self._debug("Node %s shutdown window closed. Next at %s.", self.cloud_node.id, time.ctime(next_opening)) self._timer.schedule(next_opening, self._later.consider_shutdown) self.last_shutdown_opening = next_opening def offer_arvados_pair(self, arvados_node): first_ping_s = arvados_node.get('first_ping_at') if (self.arvados_node is not None) or (not first_ping_s): return None elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)): self._later.update_arvados_node(arvados_node) return self.cloud_node.id else: return None def update_cloud_node(self, cloud_node): if cloud_node is not None: self.cloud_node = cloud_node self._later.consider_shutdown() def update_arvados_node(self, arvados_node): # If the cloud node's FQDN doesn't match what's in the Arvados node # record, make them match. # This method is a little unusual in the way it just fires off the # request without checking the result or retrying errors. That's # because this update happens every time we reload the Arvados node # list: if a previous sync attempt failed, we'll see that the names # are out of sync and just try again. ComputeNodeUpdateActor has # the logic to throttle those effective retries when there's trouble. if arvados_node is not None: self.arvados_node = arvados_node if (self._cloud_node_fqdn(self.cloud_node) != arvados_node_fqdn(self.arvados_node)): self._update.sync_node(self.cloud_node, self.arvados_node) self._later.consider_shutdown()