From: Brett Smith Date: Tue, 11 Nov 2014 22:23:14 +0000 (-0500) Subject: 4380: Reorganize arvnodeman.computenode. X-Git-Tag: 1.1.0~2005^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/b626a85eb86fd4909712852040cd305c71c37ee5?hp=-c 4380: Reorganize arvnodeman.computenode. This makes the hierarchy a little richer: * arvnodeman.computenode.driver has all the cloud driver wrappers. * arvnodeman.computenode.dispatch will be just like that, except it will consider local dispatch concerns. For example, I'm going to add a SLURM submodule here to take care of draining. * arvnodeman.computenode still has utility functions and ShutdownTimer. --- b626a85eb86fd4909712852040cd305c71c37ee5 diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py index 63effe9d37..4955992faa 100644 --- a/services/nodemanager/arvnodeman/computenode/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -2,16 +2,9 @@ from __future__ import absolute_import, print_function -import functools import itertools -import logging import time -import pykka - -from ..clientactor import _notify_subscribers -from .. import config - def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'): hostname = arvados_node.get('hostname') or default_hostname return '{}.{}'.format(hostname, arvados_node['domain']) @@ -23,252 +16,6 @@ def arvados_node_mtime(node): def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time -class BaseComputeNodeDriver(object): - """Abstract base class for compute node drivers. - - libcloud abstracts away many of the differences between cloud providers, - but managing compute nodes requires some cloud-specific features (e.g., - on EC2 we use tags to identify compute nodes). Compute node drivers - are responsible for translating the node manager's cloud requests to a - specific cloud's vocabulary. - - Subclasses must implement arvados_create_kwargs (to update node - creation kwargs with information about the specific Arvados node - record), sync_node, and node_start_time. - """ - def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class): - self.real = driver_class(**auth_kwargs) - self.list_kwargs = list_kwargs - self.create_kwargs = create_kwargs - - def __getattr__(self, name): - # Proxy non-extension methods to the real driver. - if (not name.startswith('_') and not name.startswith('ex_') - and hasattr(self.real, name)): - return getattr(self.real, name) - else: - return super(BaseComputeNodeDriver, self).__getattr__(name) - - def search_for(self, term, list_method, key=lambda item: item.id): - cache_key = (list_method, term) - if cache_key not in self.SEARCH_CACHE: - results = [item for item in getattr(self.real, list_method)() - if key(item) == term] - count = len(results) - if count != 1: - raise ValueError("{} returned {} results for '{}'".format( - list_method, count, term)) - self.SEARCH_CACHE[cache_key] = results[0] - return self.SEARCH_CACHE[cache_key] - - def list_nodes(self): - return self.real.list_nodes(**self.list_kwargs) - - def arvados_create_kwargs(self, arvados_node): - raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs") - - def create_node(self, size, arvados_node): - kwargs = self.create_kwargs.copy() - kwargs.update(self.arvados_create_kwargs(arvados_node)) - kwargs['size'] = size - return self.real.create_node(**kwargs) - - def sync_node(self, cloud_node, arvados_node): - # When a compute node first pings the API server, the API server - # will automatically assign some attributes on the corresponding - # node record, like hostname. This method should propagate that - # information back to the cloud node appropriately. - raise NotImplementedError("BaseComputeNodeDriver.sync_node") - - @classmethod - def node_start_time(cls, node): - raise NotImplementedError("BaseComputeNodeDriver.node_start_time") - - -ComputeNodeDriverClass = BaseComputeNodeDriver - -class ComputeNodeStateChangeBase(config.actor_class): - """Base class for actors that change a compute node's state. - - This base class takes care of retrying changes and notifying - subscribers when the change is finished. - """ - def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait): - super(ComputeNodeStateChangeBase, self).__init__() - self._later = self.actor_ref.proxy() - self._timer = timer_actor - self._logger = logging.getLogger(logger_name) - self.min_retry_wait = retry_wait - self.max_retry_wait = max_retry_wait - self.retry_wait = retry_wait - self.subscribers = set() - - @staticmethod - def _retry(errors): - """Retry decorator for an actor method that makes remote requests. - - Use this function to decorator an actor method, and pass in a - tuple of exceptions to catch. This decorator will schedule - retries of that method with exponential backoff if the - original method raises any of the given errors. - """ - def decorator(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - try: - orig_func(self, *args, **kwargs) - except errors as error: - self._logger.warning( - "Client error: %s - waiting %s seconds", - error, self.retry_wait) - self._timer.schedule(self.retry_wait, - getattr(self._later, - orig_func.__name__), - *args, **kwargs) - self.retry_wait = min(self.retry_wait * 2, - self.max_retry_wait) - else: - self.retry_wait = self.min_retry_wait - return wrapper - return decorator - - def _finished(self): - _notify_subscribers(self._later, self.subscribers) - self.subscribers = None - - def subscribe(self, subscriber): - if self.subscribers is None: - try: - subscriber(self._later) - except pykka.ActorDeadError: - pass - else: - self.subscribers.add(subscriber) - - -class ComputeNodeSetupActor(ComputeNodeStateChangeBase): - """Actor to create and set up a cloud compute node. - - This actor prepares an Arvados node record for a new compute node - (either creating one or cleaning one passed in), then boots the - actual compute node. It notifies subscribers when the cloud node - is successfully created (the last step in the process for Node - Manager to handle). - """ - def __init__(self, timer_actor, arvados_client, cloud_client, - cloud_size, arvados_node=None, - retry_wait=1, max_retry_wait=180): - super(ComputeNodeSetupActor, self).__init__( - 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait) - self._arvados = arvados_client - self._cloud = cloud_client - self.cloud_size = cloud_size - self.arvados_node = None - self.cloud_node = None - if arvados_node is None: - self._later.create_arvados_node() - else: - self._later.prepare_arvados_node(arvados_node) - - @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) - def create_arvados_node(self): - self.arvados_node = self._arvados.nodes().create(body={}).execute() - self._later.create_cloud_node() - - @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) - def prepare_arvados_node(self, node): - self.arvados_node = self._arvados.nodes().update( - uuid=node['uuid'], - body={'hostname': None, - 'ip_address': None, - 'slot_number': None, - 'first_ping_at': None, - 'last_ping_at': None, - 'info': {'ec2_instance_id': None, - 'last_action': "Prepared by Node Manager"}} - ).execute() - self._later.create_cloud_node() - - @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) - def create_cloud_node(self): - self._logger.info("Creating cloud node with size %s.", - self.cloud_size.name) - self.cloud_node = self._cloud.create_node(self.cloud_size, - self.arvados_node) - self._logger.info("Cloud node %s created.", self.cloud_node.id) - self._finished() - - def stop_if_no_cloud_node(self): - if self.cloud_node is None: - self.stop() - - -class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): - """Actor to shut down a compute node. - - This actor simply destroys a cloud node, retrying as needed. - """ - def __init__(self, timer_actor, cloud_client, cloud_node, - retry_wait=1, max_retry_wait=180): - super(ComputeNodeShutdownActor, self).__init__( - 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait) - self._cloud = cloud_client - self.cloud_node = cloud_node - self._later.shutdown_node() - - @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) - def shutdown_node(self): - self._cloud.destroy_node(self.cloud_node) - self._logger.info("Cloud node %s shut down.", self.cloud_node.id) - self._finished() - - -class ComputeNodeUpdateActor(config.actor_class): - """Actor to dispatch one-off cloud management requests. - - This actor receives requests for small cloud updates, and - dispatches them to a real driver. ComputeNodeMonitorActors use - this to perform maintenance tasks on themselves. Having a - dedicated actor for this gives us the opportunity to control the - flow of requests; e.g., by backing off when errors occur. - - This actor is most like a "traditional" Pykka actor: there's no - subscribing, but instead methods return real driver results. If - you're interested in those results, you should get them from the - Future that the proxy method returns. Be prepared to handle exceptions - from the cloud driver when you do. - """ - def __init__(self, cloud_factory, max_retry_wait=180): - super(ComputeNodeUpdateActor, self).__init__() - self._cloud = cloud_factory() - self.max_retry_wait = max_retry_wait - self.error_streak = 0 - self.next_request_time = time.time() - - def _throttle_errors(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - throttle_time = self.next_request_time - time.time() - if throttle_time > 0: - time.sleep(throttle_time) - self.next_request_time = time.time() - try: - result = orig_func(self, *args, **kwargs) - except config.CLOUD_ERRORS: - self.error_streak += 1 - self.next_request_time += min(2 ** self.error_streak, - self.max_retry_wait) - raise - else: - self.error_streak = 0 - return result - return wrapper - - @_throttle_errors - def sync_node(self, cloud_node, arvados_node): - return self._cloud.sync_node(cloud_node, arvados_node) - - class ShutdownTimer(object): """Keep track of a cloud node's shutdown windows. @@ -309,103 +56,3 @@ class ShutdownTimer(object): def window_open(self): self._advance_opening() return 0 < (time.time() - self._open_start) < self._open_for - - -class ComputeNodeMonitorActor(config.actor_class): - """Actor to manage a running compute node. - - This actor gets updates about a compute node's cloud and Arvados records. - It uses this information to notify subscribers when the node is eligible - for shutdown. - """ - def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer, - timer_actor, update_actor, arvados_node=None, - poll_stale_after=600, node_stale_after=3600): - super(ComputeNodeMonitorActor, self).__init__() - self._later = self.actor_ref.proxy() - self._logger = logging.getLogger('arvnodeman.computenode') - self._last_log = None - self._shutdowns = shutdown_timer - self._timer = timer_actor - self._update = update_actor - self.cloud_node = cloud_node - self.cloud_node_start_time = cloud_node_start_time - self.poll_stale_after = poll_stale_after - self.node_stale_after = node_stale_after - self.subscribers = set() - self.arvados_node = None - self._later.update_arvados_node(arvados_node) - self.last_shutdown_opening = None - self._later.consider_shutdown() - - def subscribe(self, subscriber): - self.subscribers.add(subscriber) - - def _debug(self, msg, *args): - if msg == self._last_log: - return - self._last_log = msg - self._logger.debug(msg, *args) - - def in_state(self, *states): - # Return a boolean to say whether or not our Arvados node record is in - # one of the given states. If state information is not - # available--because this node has no Arvados record, the record is - # stale, or the record has no state information--return None. - if (self.arvados_node is None) or not timestamp_fresh( - arvados_node_mtime(self.arvados_node), self.node_stale_after): - return None - state = self.arvados_node['info'].get('slurm_state') - if not state: - return None - result = state in states - if state == 'idle': - result = result and not self.arvados_node['job_uuid'] - return result - - def _shutdown_eligible(self): - if self.arvados_node is None: - # If this is a new, unpaired node, it's eligible for - # shutdown--we figure there was an error during bootstrap. - return timestamp_fresh(self.cloud_node_start_time, - self.node_stale_after) - else: - return self.in_state('idle') - - def consider_shutdown(self): - next_opening = self._shutdowns.next_opening() - if self._shutdowns.window_open(): - if self._shutdown_eligible(): - self._debug("Node %s suggesting shutdown.", self.cloud_node.id) - _notify_subscribers(self._later, self.subscribers) - else: - self._debug("Node %s shutdown window open but node busy.", - self.cloud_node.id) - else: - self._debug("Node %s shutdown window closed. Next at %s.", - self.cloud_node.id, time.ctime(next_opening)) - if self.last_shutdown_opening != next_opening: - self._timer.schedule(next_opening, self._later.consider_shutdown) - self.last_shutdown_opening = next_opening - - def offer_arvados_pair(self, arvados_node): - if self.arvados_node is not None: - return None - elif arvados_node['ip_address'] in self.cloud_node.private_ips: - self._later.update_arvados_node(arvados_node) - return self.cloud_node.id - else: - return None - - def update_cloud_node(self, cloud_node): - if cloud_node is not None: - self.cloud_node = cloud_node - self._later.consider_shutdown() - - def update_arvados_node(self, arvados_node): - if arvados_node is not None: - self.arvados_node = arvados_node - new_hostname = arvados_node_fqdn(self.arvados_node) - if new_hostname != self.cloud_node.name: - self._update.sync_node(self.cloud_node, self.arvados_node) - self._later.consider_shutdown() diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py new file mode 100644 index 0000000000..d613ef1371 --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import functools +import logging +import time + +import pykka + +from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh +from ...clientactor import _notify_subscribers +from ... import config + +class ComputeNodeStateChangeBase(config.actor_class): + """Base class for actors that change a compute node's state. + + This base class takes care of retrying changes and notifying + subscribers when the change is finished. + """ + def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait): + super(ComputeNodeStateChangeBase, self).__init__() + self._later = self.actor_ref.proxy() + self._timer = timer_actor + self._logger = logging.getLogger(logger_name) + self.min_retry_wait = retry_wait + self.max_retry_wait = max_retry_wait + self.retry_wait = retry_wait + self.subscribers = set() + + @staticmethod + def _retry(errors): + """Retry decorator for an actor method that makes remote requests. + + Use this function to decorator an actor method, and pass in a + tuple of exceptions to catch. This decorator will schedule + retries of that method with exponential backoff if the + original method raises any of the given errors. + """ + def decorator(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + try: + orig_func(self, *args, **kwargs) + except errors as error: + self._logger.warning( + "Client error: %s - waiting %s seconds", + error, self.retry_wait) + self._timer.schedule(self.retry_wait, + getattr(self._later, + orig_func.__name__), + *args, **kwargs) + self.retry_wait = min(self.retry_wait * 2, + self.max_retry_wait) + else: + self.retry_wait = self.min_retry_wait + return wrapper + return decorator + + def _finished(self): + _notify_subscribers(self._later, self.subscribers) + self.subscribers = None + + def subscribe(self, subscriber): + if self.subscribers is None: + try: + subscriber(self._later) + except pykka.ActorDeadError: + pass + else: + self.subscribers.add(subscriber) + + +class ComputeNodeSetupActor(ComputeNodeStateChangeBase): + """Actor to create and set up a cloud compute node. + + This actor prepares an Arvados node record for a new compute node + (either creating one or cleaning one passed in), then boots the + actual compute node. It notifies subscribers when the cloud node + is successfully created (the last step in the process for Node + Manager to handle). + """ + def __init__(self, timer_actor, arvados_client, cloud_client, + cloud_size, arvados_node=None, + retry_wait=1, max_retry_wait=180): + super(ComputeNodeSetupActor, self).__init__( + 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait) + self._arvados = arvados_client + self._cloud = cloud_client + self.cloud_size = cloud_size + self.arvados_node = None + self.cloud_node = None + if arvados_node is None: + self._later.create_arvados_node() + else: + self._later.prepare_arvados_node(arvados_node) + + @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) + def create_arvados_node(self): + self.arvados_node = self._arvados.nodes().create(body={}).execute() + self._later.create_cloud_node() + + @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS) + def prepare_arvados_node(self, node): + self.arvados_node = self._arvados.nodes().update( + uuid=node['uuid'], + body={'hostname': None, + 'ip_address': None, + 'slot_number': None, + 'first_ping_at': None, + 'last_ping_at': None, + 'info': {'ec2_instance_id': None, + 'last_action': "Prepared by Node Manager"}} + ).execute() + self._later.create_cloud_node() + + @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) + def create_cloud_node(self): + self._logger.info("Creating cloud node with size %s.", + self.cloud_size.name) + self.cloud_node = self._cloud.create_node(self.cloud_size, + self.arvados_node) + self._logger.info("Cloud node %s created.", self.cloud_node.id) + self._finished() + + def stop_if_no_cloud_node(self): + if self.cloud_node is None: + self.stop() + + +class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): + """Actor to shut down a compute node. + + This actor simply destroys a cloud node, retrying as needed. + """ + def __init__(self, timer_actor, cloud_client, cloud_node, + retry_wait=1, max_retry_wait=180): + super(ComputeNodeShutdownActor, self).__init__( + 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait) + self._cloud = cloud_client + self.cloud_node = cloud_node + self._later.shutdown_node() + + @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) + def shutdown_node(self): + self._cloud.destroy_node(self.cloud_node) + self._logger.info("Cloud node %s shut down.", self.cloud_node.id) + self._finished() + + +class ComputeNodeUpdateActor(config.actor_class): + """Actor to dispatch one-off cloud management requests. + + This actor receives requests for small cloud updates, and + dispatches them to a real driver. ComputeNodeMonitorActors use + this to perform maintenance tasks on themselves. Having a + dedicated actor for this gives us the opportunity to control the + flow of requests; e.g., by backing off when errors occur. + + This actor is most like a "traditional" Pykka actor: there's no + subscribing, but instead methods return real driver results. If + you're interested in those results, you should get them from the + Future that the proxy method returns. Be prepared to handle exceptions + from the cloud driver when you do. + """ + def __init__(self, cloud_factory, max_retry_wait=180): + super(ComputeNodeUpdateActor, self).__init__() + self._cloud = cloud_factory() + self.max_retry_wait = max_retry_wait + self.error_streak = 0 + self.next_request_time = time.time() + + def _throttle_errors(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + throttle_time = self.next_request_time - time.time() + if throttle_time > 0: + time.sleep(throttle_time) + self.next_request_time = time.time() + try: + result = orig_func(self, *args, **kwargs) + except config.CLOUD_ERRORS: + self.error_streak += 1 + self.next_request_time += min(2 ** self.error_streak, + self.max_retry_wait) + raise + else: + self.error_streak = 0 + return result + return wrapper + + @_throttle_errors + def sync_node(self, cloud_node, arvados_node): + return self._cloud.sync_node(cloud_node, arvados_node) + + +class ComputeNodeMonitorActor(config.actor_class): + """Actor to manage a running compute node. + + This actor gets updates about a compute node's cloud and Arvados records. + It uses this information to notify subscribers when the node is eligible + for shutdown. + """ + def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer, + timer_actor, update_actor, arvados_node=None, + poll_stale_after=600, node_stale_after=3600): + super(ComputeNodeMonitorActor, self).__init__() + self._later = self.actor_ref.proxy() + self._logger = logging.getLogger('arvnodeman.computenode') + self._last_log = None + self._shutdowns = shutdown_timer + self._timer = timer_actor + self._update = update_actor + self.cloud_node = cloud_node + self.cloud_node_start_time = cloud_node_start_time + self.poll_stale_after = poll_stale_after + self.node_stale_after = node_stale_after + self.subscribers = set() + self.arvados_node = None + self._later.update_arvados_node(arvados_node) + self.last_shutdown_opening = None + self._later.consider_shutdown() + + def subscribe(self, subscriber): + self.subscribers.add(subscriber) + + def _debug(self, msg, *args): + if msg == self._last_log: + return + self._last_log = msg + self._logger.debug(msg, *args) + + def in_state(self, *states): + # Return a boolean to say whether or not our Arvados node record is in + # one of the given states. If state information is not + # available--because this node has no Arvados record, the record is + # stale, or the record has no state information--return None. + if (self.arvados_node is None) or not timestamp_fresh( + arvados_node_mtime(self.arvados_node), self.node_stale_after): + return None + state = self.arvados_node['info'].get('slurm_state') + if not state: + return None + result = state in states + if state == 'idle': + result = result and not self.arvados_node['job_uuid'] + return result + + def _shutdown_eligible(self): + if self.arvados_node is None: + # If this is a new, unpaired node, it's eligible for + # shutdown--we figure there was an error during bootstrap. + return timestamp_fresh(self.cloud_node_start_time, + self.node_stale_after) + else: + return self.in_state('idle') + + def consider_shutdown(self): + next_opening = self._shutdowns.next_opening() + if self._shutdowns.window_open(): + if self._shutdown_eligible(): + self._debug("Node %s suggesting shutdown.", self.cloud_node.id) + _notify_subscribers(self._later, self.subscribers) + else: + self._debug("Node %s shutdown window open but node busy.", + self.cloud_node.id) + else: + self._debug("Node %s shutdown window closed. Next at %s.", + self.cloud_node.id, time.ctime(next_opening)) + if self.last_shutdown_opening != next_opening: + self._timer.schedule(next_opening, self._later.consider_shutdown) + self.last_shutdown_opening = next_opening + + def offer_arvados_pair(self, arvados_node): + if self.arvados_node is not None: + return None + elif arvados_node['ip_address'] in self.cloud_node.private_ips: + self._later.update_arvados_node(arvados_node) + return self.cloud_node.id + else: + return None + + def update_cloud_node(self, cloud_node): + if cloud_node is not None: + self.cloud_node = cloud_node + self._later.consider_shutdown() + + def update_arvados_node(self, arvados_node): + if arvados_node is not None: + self.arvados_node = arvados_node + new_hostname = arvados_node_fqdn(self.arvados_node) + if new_hostname != self.cloud_node.name: + self._update.sync_node(self.cloud_node, self.arvados_node) + self._later.consider_shutdown() diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py new file mode 100644 index 0000000000..a20cfde371 --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/driver/__init__.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +class BaseComputeNodeDriver(object): + """Abstract base class for compute node drivers. + + libcloud abstracts away many of the differences between cloud providers, + but managing compute nodes requires some cloud-specific features (e.g., + on EC2 we use tags to identify compute nodes). Compute node drivers + are responsible for translating the node manager's cloud requests to a + specific cloud's vocabulary. + + Subclasses must implement arvados_create_kwargs (to update node + creation kwargs with information about the specific Arvados node + record), sync_node, and node_start_time. + """ + def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class): + self.real = driver_class(**auth_kwargs) + self.list_kwargs = list_kwargs + self.create_kwargs = create_kwargs + + def __getattr__(self, name): + # Proxy non-extension methods to the real driver. + if (not name.startswith('_') and not name.startswith('ex_') + and hasattr(self.real, name)): + return getattr(self.real, name) + else: + return super(BaseComputeNodeDriver, self).__getattr__(name) + + def search_for(self, term, list_method, key=lambda item: item.id): + cache_key = (list_method, term) + if cache_key not in self.SEARCH_CACHE: + results = [item for item in getattr(self.real, list_method)() + if key(item) == term] + count = len(results) + if count != 1: + raise ValueError("{} returned {} results for '{}'".format( + list_method, count, term)) + self.SEARCH_CACHE[cache_key] = results[0] + return self.SEARCH_CACHE[cache_key] + + def list_nodes(self): + return self.real.list_nodes(**self.list_kwargs) + + def arvados_create_kwargs(self, arvados_node): + raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs") + + def create_node(self, size, arvados_node): + kwargs = self.create_kwargs.copy() + kwargs.update(self.arvados_create_kwargs(arvados_node)) + kwargs['size'] = size + return self.real.create_node(**kwargs) + + def sync_node(self, cloud_node, arvados_node): + # When a compute node first pings the API server, the API server + # will automatically assign some attributes on the corresponding + # node record, like hostname. This method should propagate that + # information back to the cloud node appropriately. + raise NotImplementedError("BaseComputeNodeDriver.sync_node") + + @classmethod + def node_start_time(cls, node): + raise NotImplementedError("BaseComputeNodeDriver.node_start_time") diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/driver/dummy.py similarity index 96% rename from services/nodemanager/arvnodeman/computenode/dummy.py rename to services/nodemanager/arvnodeman/computenode/driver/dummy.py index 6c39feaf61..3a286bba01 100644 --- a/services/nodemanager/arvnodeman/computenode/dummy.py +++ b/services/nodemanager/arvnodeman/computenode/driver/dummy.py @@ -7,7 +7,8 @@ import time import libcloud.compute.providers as cloud_provider import libcloud.compute.types as cloud_types -from . import BaseComputeNodeDriver, arvados_node_fqdn +from . import BaseComputeNodeDriver +from .. import arvados_node_fqdn class ComputeNodeDriver(BaseComputeNodeDriver): """Compute node driver wrapper for libcloud's dummy driver. diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/driver/ec2.py similarity index 98% rename from services/nodemanager/arvnodeman/computenode/ec2.py rename to services/nodemanager/arvnodeman/computenode/driver/ec2.py index 359bed4d1c..c0992f7b96 100644 --- a/services/nodemanager/arvnodeman/computenode/ec2.py +++ b/services/nodemanager/arvnodeman/computenode/driver/ec2.py @@ -9,7 +9,8 @@ import libcloud.compute.providers as cloud_provider import libcloud.compute.types as cloud_types from libcloud.compute.drivers import ec2 as cloud_ec2 -from . import BaseComputeNodeDriver, arvados_node_fqdn +from . import BaseComputeNodeDriver +from .. import arvados_node_fqdn ### Monkeypatch libcloud to support AWS' new SecurityGroup API. # These classes can be removed when libcloud support specifying diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py index 754b9319f0..24fd828cf5 100644 --- a/services/nodemanager/arvnodeman/config.py +++ b/services/nodemanager/arvnodeman/config.py @@ -85,7 +85,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): http=http) def new_cloud_client(self): - module = importlib.import_module('arvnodeman.computenode.' + + module = importlib.import_module('arvnodeman.computenode.driver.' + self.get('Cloud', 'provider')) auth_kwargs = self.get_section('Cloud Credentials') if 'timeout' in auth_kwargs: diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index eaf10be324..7ff736fb86 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -9,6 +9,7 @@ import time import pykka from . import computenode as cnode +from .computenode import dispatch from .config import actor_class class _ComputeNodeRecord(object): @@ -96,9 +97,9 @@ class NodeManagerDaemonActor(actor_class): arvados_factory, cloud_factory, shutdown_windows, min_nodes, max_nodes, poll_stale_after=600, node_stale_after=7200, - node_setup_class=cnode.ComputeNodeSetupActor, - node_shutdown_class=cnode.ComputeNodeShutdownActor, - node_actor_class=cnode.ComputeNodeMonitorActor): + node_setup_class=dispatch.ComputeNodeSetupActor, + node_shutdown_class=dispatch.ComputeNodeShutdownActor, + node_actor_class=dispatch.ComputeNodeMonitorActor): super(NodeManagerDaemonActor, self).__init__() self._node_setup = node_setup_class self._node_shutdown = node_shutdown_class diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py index f4ad7163fa..d2f4afee06 100644 --- a/services/nodemanager/arvnodeman/launcher.py +++ b/services/nodemanager/arvnodeman/launcher.py @@ -12,9 +12,7 @@ import daemon import pykka from . import config as nmconfig -from .computenode import \ - ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \ - ShutdownTimer +from .computenode.dispatch import ComputeNodeUpdateActor from .daemon import NodeManagerDaemonActor from .jobqueue import JobQueueMonitorActor, ServerCalculator from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py index 5ced5f99bb..e22cccc5b0 100644 --- a/services/nodemanager/tests/test_computenode.py +++ b/services/nodemanager/tests/test_computenode.py @@ -6,137 +6,11 @@ import time import unittest import arvados.errors as arverror -import httplib2 import mock -import pykka import arvnodeman.computenode as cnode from . import testutil -class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase): - def make_mocks(self, arvados_effect=None, cloud_effect=None): - if arvados_effect is None: - arvados_effect = [testutil.arvados_node_mock()] - self.arvados_effect = arvados_effect - self.timer = testutil.MockTimer() - self.api_client = mock.MagicMock(name='api_client') - self.api_client.nodes().create().execute.side_effect = arvados_effect - self.api_client.nodes().update().execute.side_effect = arvados_effect - self.cloud_client = mock.MagicMock(name='cloud_client') - self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1) - - def make_actor(self, arv_node=None): - if not hasattr(self, 'timer'): - self.make_mocks(arvados_effect=[arv_node]) - self.setup_actor = cnode.ComputeNodeSetupActor.start( - self.timer, self.api_client, self.cloud_client, - testutil.MockSize(1), arv_node).proxy() - - def test_creation_without_arvados_node(self): - self.make_actor() - self.assertEqual(self.arvados_effect[-1], - self.setup_actor.arvados_node.get(self.TIMEOUT)) - self.assertTrue(self.api_client.nodes().create().execute.called) - self.assertEqual(self.cloud_client.create_node(), - self.setup_actor.cloud_node.get(self.TIMEOUT)) - - def test_creation_with_arvados_node(self): - self.make_actor(testutil.arvados_node_mock()) - self.assertEqual(self.arvados_effect[-1], - self.setup_actor.arvados_node.get(self.TIMEOUT)) - self.assertTrue(self.api_client.nodes().update().execute.called) - self.assertEqual(self.cloud_client.create_node(), - self.setup_actor.cloud_node.get(self.TIMEOUT)) - - def test_failed_calls_retried(self): - self.make_mocks([ - arverror.ApiError(httplib2.Response({'status': '500'}), ""), - testutil.arvados_node_mock(), - ]) - self.make_actor() - self.wait_for_assignment(self.setup_actor, 'cloud_node') - - def test_stop_when_no_cloud_node(self): - self.make_mocks( - arverror.ApiError(httplib2.Response({'status': '500'}), "")) - self.make_actor() - self.setup_actor.stop_if_no_cloud_node() - self.assertTrue( - self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT)) - - def test_no_stop_when_cloud_node(self): - self.make_actor() - self.wait_for_assignment(self.setup_actor, 'cloud_node') - self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT) - self.assertTrue(self.stop_proxy(self.setup_actor), - "actor was stopped by stop_if_no_cloud_node") - - def test_subscribe(self): - self.make_mocks( - arverror.ApiError(httplib2.Response({'status': '500'}), "")) - self.make_actor() - subscriber = mock.Mock(name='subscriber_mock') - self.setup_actor.subscribe(subscriber) - self.api_client.nodes().create().execute.side_effect = [ - testutil.arvados_node_mock()] - self.wait_for_assignment(self.setup_actor, 'cloud_node') - self.assertEqual(self.setup_actor.actor_ref.actor_urn, - subscriber.call_args[0][0].actor_ref.actor_urn) - - def test_late_subscribe(self): - self.make_actor() - subscriber = mock.Mock(name='subscriber_mock') - self.wait_for_assignment(self.setup_actor, 'cloud_node') - self.setup_actor.subscribe(subscriber).get(self.TIMEOUT) - self.stop_proxy(self.setup_actor) - self.assertEqual(self.setup_actor.actor_ref.actor_urn, - subscriber.call_args[0][0].actor_ref.actor_urn) - - -class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, - unittest.TestCase): - def make_mocks(self, cloud_node=None): - self.timer = testutil.MockTimer() - self.cloud_client = mock.MagicMock(name='cloud_client') - if cloud_node is None: - cloud_node = testutil.cloud_node_mock() - self.cloud_node = cloud_node - - def make_actor(self, arv_node=None): - if not hasattr(self, 'timer'): - self.make_mocks() - self.shutdown_actor = cnode.ComputeNodeShutdownActor.start( - self.timer, self.cloud_client, self.cloud_node).proxy() - - def test_easy_shutdown(self): - self.make_actor() - self.shutdown_actor.cloud_node.get(self.TIMEOUT) - self.stop_proxy(self.shutdown_actor) - self.assertTrue(self.cloud_client.destroy_node.called) - - def test_late_subscribe(self): - self.make_actor() - subscriber = mock.Mock(name='subscriber_mock') - self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT) - self.stop_proxy(self.shutdown_actor) - self.assertEqual(self.shutdown_actor.actor_ref.actor_urn, - subscriber.call_args[0][0].actor_ref.actor_urn) - - -class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin, - unittest.TestCase): - def make_actor(self): - self.driver = mock.MagicMock(name='driver_mock') - self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy() - - def test_node_sync(self): - self.make_actor() - cloud_node = testutil.cloud_node_mock() - arv_node = testutil.arvados_node_mock() - self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT) - self.driver().sync_node.assert_called_with(cloud_node, arv_node) - - @mock.patch('time.time', return_value=1) class ShutdownTimerTestCase(unittest.TestCase): def test_two_length_window(self, time_mock): @@ -160,156 +34,3 @@ class ShutdownTimerTestCase(unittest.TestCase): time_mock.return_value += 200 self.assertEqual(961, timer.next_opening()) self.assertFalse(timer.window_open()) - - -class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin, - unittest.TestCase): - class MockShutdownTimer(object): - def _set_state(self, is_open, next_opening): - self.window_open = lambda: is_open - self.next_opening = lambda: next_opening - - - def make_mocks(self, node_num): - self.shutdowns = self.MockShutdownTimer() - self.shutdowns._set_state(False, 300) - self.timer = mock.MagicMock(name='timer_mock') - self.updates = mock.MagicMock(name='update_mock') - self.cloud_mock = testutil.cloud_node_mock(node_num) - self.subscriber = mock.Mock(name='subscriber_mock') - - def make_actor(self, node_num=1, arv_node=None, start_time=None): - if not hasattr(self, 'cloud_mock'): - self.make_mocks(node_num) - if start_time is None: - start_time = time.time() - self.node_actor = cnode.ComputeNodeMonitorActor.start( - self.cloud_mock, start_time, self.shutdowns, self.timer, - self.updates, arv_node).proxy() - self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT) - - def node_state(self, *states): - return self.node_actor.in_state(*states).get(self.TIMEOUT) - - def test_in_state_when_unpaired(self): - self.make_actor() - self.assertIsNone(self.node_state('idle', 'alloc')) - - def test_in_state_when_pairing_stale(self): - self.make_actor(arv_node=testutil.arvados_node_mock( - job_uuid=None, age=90000)) - self.assertIsNone(self.node_state('idle', 'alloc')) - - def test_in_state_when_no_state_available(self): - self.make_actor(arv_node=testutil.arvados_node_mock(info={})) - self.assertIsNone(self.node_state('idle', 'alloc')) - - def test_in_idle_state(self): - self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None)) - self.assertTrue(self.node_state('idle')) - self.assertFalse(self.node_state('alloc')) - self.assertTrue(self.node_state('idle', 'alloc')) - - def test_in_alloc_state(self): - self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True)) - self.assertFalse(self.node_state('idle')) - self.assertTrue(self.node_state('alloc')) - self.assertTrue(self.node_state('idle', 'alloc')) - - def test_init_shutdown_scheduling(self): - self.make_actor() - self.assertTrue(self.timer.schedule.called) - self.assertEqual(300, self.timer.schedule.call_args[0][0]) - - def test_shutdown_subscription(self): - self.make_actor() - self.shutdowns._set_state(True, 600) - self.node_actor.consider_shutdown().get(self.TIMEOUT) - self.assertTrue(self.subscriber.called) - self.assertEqual(self.node_actor.actor_ref.actor_urn, - self.subscriber.call_args[0][0].actor_ref.actor_urn) - - def test_shutdown_without_arvados_node(self): - self.make_actor() - self.shutdowns._set_state(True, 600) - self.node_actor.consider_shutdown().get(self.TIMEOUT) - self.assertTrue(self.subscriber.called) - - def test_no_shutdown_without_arvados_node_and_old_cloud_node(self): - self.make_actor(start_time=0) - self.shutdowns._set_state(True, 600) - self.node_actor.consider_shutdown().get(self.TIMEOUT) - self.assertFalse(self.subscriber.called) - - def check_shutdown_rescheduled(self, window_open, next_window, - schedule_time=None): - self.shutdowns._set_state(window_open, next_window) - self.timer.schedule.reset_mock() - self.node_actor.consider_shutdown().get(self.TIMEOUT) - self.stop_proxy(self.node_actor) - self.assertTrue(self.timer.schedule.called) - if schedule_time is not None: - self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0]) - self.assertFalse(self.subscriber.called) - - def test_shutdown_window_close_scheduling(self): - self.make_actor() - self.check_shutdown_rescheduled(False, 600, 600) - - def test_no_shutdown_when_node_running_job(self): - self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True)) - self.check_shutdown_rescheduled(True, 600) - - def test_no_shutdown_when_node_state_unknown(self): - self.make_actor(5, testutil.arvados_node_mock(5, info={})) - self.check_shutdown_rescheduled(True, 600) - - def test_no_shutdown_when_node_state_stale(self): - self.make_actor(6, testutil.arvados_node_mock(6, age=90000)) - self.check_shutdown_rescheduled(True, 600) - - def test_arvados_node_match(self): - self.make_actor(2) - arv_node = testutil.arvados_node_mock( - 2, hostname='compute-two.zzzzz.arvadosapi.com') - pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT) - self.assertEqual(self.cloud_mock.id, pair_id) - self.stop_proxy(self.node_actor) - self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node) - - def test_arvados_node_mismatch(self): - self.make_actor(3) - arv_node = testutil.arvados_node_mock(1) - self.assertIsNone( - self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)) - - def test_update_cloud_node(self): - self.make_actor(1) - self.make_mocks(2) - self.cloud_mock.id = '1' - self.node_actor.update_cloud_node(self.cloud_mock) - current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) - self.assertEqual([testutil.ip_address_mock(2)], - current_cloud.private_ips) - - def test_missing_cloud_node_update(self): - self.make_actor(1) - self.node_actor.update_cloud_node(None) - current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) - self.assertEqual([testutil.ip_address_mock(1)], - current_cloud.private_ips) - - def test_update_arvados_node(self): - self.make_actor(3) - job_uuid = 'zzzzz-jjjjj-updatejobnode00' - new_arvados = testutil.arvados_node_mock(3, job_uuid) - self.node_actor.update_arvados_node(new_arvados) - current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) - self.assertEqual(job_uuid, current_arvados['job_uuid']) - - def test_missing_arvados_node_update(self): - self.make_actor(4, testutil.arvados_node_mock(4)) - self.node_actor.update_arvados_node(None) - current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) - self.assertEqual(testutil.ip_address_mock(4), - current_arvados['ip_address']) diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py new file mode 100644 index 0000000000..ece186bcb3 --- /dev/null +++ b/services/nodemanager/tests/test_computenode_dispatch.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time +import unittest + +import arvados.errors as arverror +import httplib2 +import mock +import pykka + +import arvnodeman.computenode.dispatch as dispatch +from . import testutil + +class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase): + def make_mocks(self, arvados_effect=None, cloud_effect=None): + if arvados_effect is None: + arvados_effect = [testutil.arvados_node_mock()] + self.arvados_effect = arvados_effect + self.timer = testutil.MockTimer() + self.api_client = mock.MagicMock(name='api_client') + self.api_client.nodes().create().execute.side_effect = arvados_effect + self.api_client.nodes().update().execute.side_effect = arvados_effect + self.cloud_client = mock.MagicMock(name='cloud_client') + self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1) + + def make_actor(self, arv_node=None): + if not hasattr(self, 'timer'): + self.make_mocks(arvados_effect=[arv_node]) + self.setup_actor = dispatch.ComputeNodeSetupActor.start( + self.timer, self.api_client, self.cloud_client, + testutil.MockSize(1), arv_node).proxy() + + def test_creation_without_arvados_node(self): + self.make_actor() + self.assertEqual(self.arvados_effect[-1], + self.setup_actor.arvados_node.get(self.TIMEOUT)) + self.assertTrue(self.api_client.nodes().create().execute.called) + self.assertEqual(self.cloud_client.create_node(), + self.setup_actor.cloud_node.get(self.TIMEOUT)) + + def test_creation_with_arvados_node(self): + self.make_actor(testutil.arvados_node_mock()) + self.assertEqual(self.arvados_effect[-1], + self.setup_actor.arvados_node.get(self.TIMEOUT)) + self.assertTrue(self.api_client.nodes().update().execute.called) + self.assertEqual(self.cloud_client.create_node(), + self.setup_actor.cloud_node.get(self.TIMEOUT)) + + def test_failed_calls_retried(self): + self.make_mocks([ + arverror.ApiError(httplib2.Response({'status': '500'}), ""), + testutil.arvados_node_mock(), + ]) + self.make_actor() + self.wait_for_assignment(self.setup_actor, 'cloud_node') + + def test_stop_when_no_cloud_node(self): + self.make_mocks( + arverror.ApiError(httplib2.Response({'status': '500'}), "")) + self.make_actor() + self.setup_actor.stop_if_no_cloud_node() + self.assertTrue( + self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT)) + + def test_no_stop_when_cloud_node(self): + self.make_actor() + self.wait_for_assignment(self.setup_actor, 'cloud_node') + self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT) + self.assertTrue(self.stop_proxy(self.setup_actor), + "actor was stopped by stop_if_no_cloud_node") + + def test_subscribe(self): + self.make_mocks( + arverror.ApiError(httplib2.Response({'status': '500'}), "")) + self.make_actor() + subscriber = mock.Mock(name='subscriber_mock') + self.setup_actor.subscribe(subscriber) + self.api_client.nodes().create().execute.side_effect = [ + testutil.arvados_node_mock()] + self.wait_for_assignment(self.setup_actor, 'cloud_node') + self.assertEqual(self.setup_actor.actor_ref.actor_urn, + subscriber.call_args[0][0].actor_ref.actor_urn) + + def test_late_subscribe(self): + self.make_actor() + subscriber = mock.Mock(name='subscriber_mock') + self.wait_for_assignment(self.setup_actor, 'cloud_node') + self.setup_actor.subscribe(subscriber).get(self.TIMEOUT) + self.stop_proxy(self.setup_actor) + self.assertEqual(self.setup_actor.actor_ref.actor_urn, + subscriber.call_args[0][0].actor_ref.actor_urn) + + +class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + def make_mocks(self, cloud_node=None): + self.timer = testutil.MockTimer() + self.cloud_client = mock.MagicMock(name='cloud_client') + if cloud_node is None: + cloud_node = testutil.cloud_node_mock() + self.cloud_node = cloud_node + + def make_actor(self, arv_node=None): + if not hasattr(self, 'timer'): + self.make_mocks() + self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start( + self.timer, self.cloud_client, self.cloud_node).proxy() + + def test_easy_shutdown(self): + self.make_actor() + self.shutdown_actor.cloud_node.get(self.TIMEOUT) + self.stop_proxy(self.shutdown_actor) + self.assertTrue(self.cloud_client.destroy_node.called) + + def test_late_subscribe(self): + self.make_actor() + subscriber = mock.Mock(name='subscriber_mock') + self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT) + self.stop_proxy(self.shutdown_actor) + self.assertEqual(self.shutdown_actor.actor_ref.actor_urn, + subscriber.call_args[0][0].actor_ref.actor_urn) + + +class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + def make_actor(self): + self.driver = mock.MagicMock(name='driver_mock') + self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy() + + def test_node_sync(self): + self.make_actor() + cloud_node = testutil.cloud_node_mock() + arv_node = testutil.arvados_node_mock() + self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT) + self.driver().sync_node.assert_called_with(cloud_node, arv_node) + + +class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + class MockShutdownTimer(object): + def _set_state(self, is_open, next_opening): + self.window_open = lambda: is_open + self.next_opening = lambda: next_opening + + + def make_mocks(self, node_num): + self.shutdowns = self.MockShutdownTimer() + self.shutdowns._set_state(False, 300) + self.timer = mock.MagicMock(name='timer_mock') + self.updates = mock.MagicMock(name='update_mock') + self.cloud_mock = testutil.cloud_node_mock(node_num) + self.subscriber = mock.Mock(name='subscriber_mock') + + def make_actor(self, node_num=1, arv_node=None, start_time=None): + if not hasattr(self, 'cloud_mock'): + self.make_mocks(node_num) + if start_time is None: + start_time = time.time() + self.node_actor = dispatch.ComputeNodeMonitorActor.start( + self.cloud_mock, start_time, self.shutdowns, self.timer, + self.updates, arv_node).proxy() + self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT) + + def node_state(self, *states): + return self.node_actor.in_state(*states).get(self.TIMEOUT) + + def test_in_state_when_unpaired(self): + self.make_actor() + self.assertIsNone(self.node_state('idle', 'alloc')) + + def test_in_state_when_pairing_stale(self): + self.make_actor(arv_node=testutil.arvados_node_mock( + job_uuid=None, age=90000)) + self.assertIsNone(self.node_state('idle', 'alloc')) + + def test_in_state_when_no_state_available(self): + self.make_actor(arv_node=testutil.arvados_node_mock(info={})) + self.assertIsNone(self.node_state('idle', 'alloc')) + + def test_in_idle_state(self): + self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None)) + self.assertTrue(self.node_state('idle')) + self.assertFalse(self.node_state('alloc')) + self.assertTrue(self.node_state('idle', 'alloc')) + + def test_in_alloc_state(self): + self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True)) + self.assertFalse(self.node_state('idle')) + self.assertTrue(self.node_state('alloc')) + self.assertTrue(self.node_state('idle', 'alloc')) + + def test_init_shutdown_scheduling(self): + self.make_actor() + self.assertTrue(self.timer.schedule.called) + self.assertEqual(300, self.timer.schedule.call_args[0][0]) + + def test_shutdown_subscription(self): + self.make_actor() + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown().get(self.TIMEOUT) + self.assertTrue(self.subscriber.called) + self.assertEqual(self.node_actor.actor_ref.actor_urn, + self.subscriber.call_args[0][0].actor_ref.actor_urn) + + def test_shutdown_without_arvados_node(self): + self.make_actor() + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown().get(self.TIMEOUT) + self.assertTrue(self.subscriber.called) + + def test_no_shutdown_without_arvados_node_and_old_cloud_node(self): + self.make_actor(start_time=0) + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown().get(self.TIMEOUT) + self.assertFalse(self.subscriber.called) + + def check_shutdown_rescheduled(self, window_open, next_window, + schedule_time=None): + self.shutdowns._set_state(window_open, next_window) + self.timer.schedule.reset_mock() + self.node_actor.consider_shutdown().get(self.TIMEOUT) + self.stop_proxy(self.node_actor) + self.assertTrue(self.timer.schedule.called) + if schedule_time is not None: + self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0]) + self.assertFalse(self.subscriber.called) + + def test_shutdown_window_close_scheduling(self): + self.make_actor() + self.check_shutdown_rescheduled(False, 600, 600) + + def test_no_shutdown_when_node_running_job(self): + self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True)) + self.check_shutdown_rescheduled(True, 600) + + def test_no_shutdown_when_node_state_unknown(self): + self.make_actor(5, testutil.arvados_node_mock(5, info={})) + self.check_shutdown_rescheduled(True, 600) + + def test_no_shutdown_when_node_state_stale(self): + self.make_actor(6, testutil.arvados_node_mock(6, age=90000)) + self.check_shutdown_rescheduled(True, 600) + + def test_arvados_node_match(self): + self.make_actor(2) + arv_node = testutil.arvados_node_mock( + 2, hostname='compute-two.zzzzz.arvadosapi.com') + pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT) + self.assertEqual(self.cloud_mock.id, pair_id) + self.stop_proxy(self.node_actor) + self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node) + + def test_arvados_node_mismatch(self): + self.make_actor(3) + arv_node = testutil.arvados_node_mock(1) + self.assertIsNone( + self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)) + + def test_update_cloud_node(self): + self.make_actor(1) + self.make_mocks(2) + self.cloud_mock.id = '1' + self.node_actor.update_cloud_node(self.cloud_mock) + current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) + self.assertEqual([testutil.ip_address_mock(2)], + current_cloud.private_ips) + + def test_missing_cloud_node_update(self): + self.make_actor(1) + self.node_actor.update_cloud_node(None) + current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) + self.assertEqual([testutil.ip_address_mock(1)], + current_cloud.private_ips) + + def test_update_arvados_node(self): + self.make_actor(3) + job_uuid = 'zzzzz-jjjjj-updatejobnode00' + new_arvados = testutil.arvados_node_mock(3, job_uuid) + self.node_actor.update_arvados_node(new_arvados) + current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) + self.assertEqual(job_uuid, current_arvados['job_uuid']) + + def test_missing_arvados_node_update(self): + self.make_actor(4, testutil.arvados_node_mock(4)) + self.node_actor.update_arvados_node(None) + current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) + self.assertEqual(testutil.ip_address_mock(4), + current_arvados['ip_address']) diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_driver_ec2.py similarity index 98% rename from services/nodemanager/tests/test_computenode_ec2.py rename to services/nodemanager/tests/test_computenode_driver_ec2.py index d1c9e43fe4..fde103e10e 100644 --- a/services/nodemanager/tests/test_computenode_ec2.py +++ b/services/nodemanager/tests/test_computenode_driver_ec2.py @@ -7,7 +7,7 @@ import unittest import mock -import arvnodeman.computenode.ec2 as ec2 +import arvnodeman.computenode.driver.ec2 as ec2 from . import testutil class EC2ComputeNodeDriverTestCase(unittest.TestCase): diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py index 4bffd092a1..394fb88b80 100644 --- a/services/nodemanager/tests/test_daemon.py +++ b/services/nodemanager/tests/test_daemon.py @@ -8,8 +8,8 @@ import unittest import mock import pykka -import arvnodeman.computenode as nmcnode import arvnodeman.daemon as nmdaemon +from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor from . import testutil class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, @@ -39,7 +39,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT) def monitor_list(self): - return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor) + return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor) def alive_monitor_count(self): return sum(1 for actor in self.monitor_list() if actor.is_alive())