From: Brett Smith Date: Fri, 3 Oct 2014 21:53:57 +0000 (-0400) Subject: 2881: Add Node Manager service. X-Git-Tag: 1.1.0~2118^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a5687a390262abebfc16cf21e62052ac0019512d 2881: Add Node Manager service. --- diff --git a/services/nodemanager/.gitignore b/services/nodemanager/.gitignore new file mode 100644 index 0000000000..488ddd592a --- /dev/null +++ b/services/nodemanager/.gitignore @@ -0,0 +1,4 @@ +*.pyc +*.egg-info +build/ +dist/ diff --git a/services/nodemanager/arvnodeman/__init__.py b/services/nodemanager/arvnodeman/__init__.py new file mode 100644 index 0000000000..a1ecac7e46 --- /dev/null +++ b/services/nodemanager/arvnodeman/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import _strptime # See . +import logging + +logger = logging.getLogger('arvnodeman') +logger.addHandler(logging.NullHandler()) diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py new file mode 100644 index 0000000000..77d85d640c --- /dev/null +++ b/services/nodemanager/arvnodeman/clientactor.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import logging +import time + +import pykka + +from .config import actor_class + +def _notify_subscribers(response, subscribers): + """Send the response to all the subscriber methods. + + If any of the subscriber actors have stopped, remove them from the + subscriber set. + """ + dead_subscribers = set() + for subscriber in subscribers: + try: + subscriber(response) + except pykka.ActorDeadError: + dead_subscribers.add(subscriber) + subscribers.difference_update(dead_subscribers) + +class RemotePollLoopActor(actor_class): + """Abstract actor class to regularly poll a remote service. + + This actor sends regular requests to a remote service, and sends each + response to subscribers. It takes care of error handling, and retrying + requests with exponential backoff. + + To use this actor, define CLIENT_ERRORS and the _send_request method. + If you also define an _item_key method, this class will support + subscribing to a specific item by key in responses. + """ + def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180): + super(RemotePollLoopActor, self).__init__() + self._client = client + self._timer = timer_actor + self._logger = logging.getLogger(self.LOGGER_NAME) + self._later = self.actor_ref.proxy() + self.min_poll_wait = poll_wait + self.max_poll_wait = max_poll_wait + self.poll_wait = self.min_poll_wait + self.last_poll_time = None + self.all_subscribers = set() + self.key_subscribers = {} + if hasattr(self, '_item_key'): + self.subscribe_to = self._subscribe_to + + def _start_polling(self): + if self.last_poll_time is None: + self.last_poll_time = time.time() + self._later.poll() + + def subscribe(self, subscriber): + self.all_subscribers.add(subscriber) + self._logger.debug("%r subscribed to all events", subscriber) + self._start_polling() + + # __init__ exposes this method to the proxy if the subclass defines + # _item_key. + def _subscribe_to(self, key, subscriber): + self.key_subscribers.setdefault(key, set()).add(subscriber) + self._logger.debug("%r subscribed to events for '%s'", subscriber, key) + self._start_polling() + + def _send_request(self): + raise NotImplementedError("subclasses must implement request method") + + def _got_response(self, response): + self.poll_wait = self.min_poll_wait + _notify_subscribers(response, self.all_subscribers) + if hasattr(self, '_item_key'): + items = {self._item_key(x): x for x in response} + for key, subscribers in self.key_subscribers.iteritems(): + _notify_subscribers(items.get(key), subscribers) + + def _got_error(self, error): + self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait) + self._logger.warning("Client error: %s - waiting %s seconds", + error, self.poll_wait) + + def poll(self): + start_time = time.time() + try: + response = self._send_request() + except self.CLIENT_ERRORS as error: + self.last_poll_time = start_time + self._got_error(error) + else: + self.last_poll_time += self.poll_wait + self._got_response(response) + self._timer.schedule(self.last_poll_time + self.poll_wait, + self._later.poll) diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py new file mode 100644 index 0000000000..ae25428944 --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import functools +import itertools +import logging +import time + +import pykka + +from ..clientactor import _notify_subscribers +from .. import config + +def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'): + hostname = arvados_node.get('hostname') or default_hostname + return '{}.{}'.format(hostname, arvados_node['domain']) + +def arvados_node_mtime(node): + return time.mktime(time.strptime(node['modified_at'] + 'UTC', + '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone + +def timestamp_fresh(timestamp, fresh_time): + return (time.time() - timestamp) < fresh_time + +def _retry(errors): + """Retry decorator for an actor method that makes remote requests. + + Use this function to decorator an actor method, and pass in a tuple of + exceptions to catch. This decorator will schedule retries of that method + with exponential backoff if the original method raises any of the given + errors. + """ + def decorator(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + try: + orig_func(self, *args, **kwargs) + except errors as error: + self._logger.warning( + "Client error: %s - waiting %s seconds", + error, self.retry_wait) + self._timer.schedule(self.retry_wait, + getattr(self._later, orig_func.__name__), + *args, **kwargs) + self.retry_wait = min(self.retry_wait * 2, + self.max_retry_wait) + else: + self.retry_wait = self.min_retry_wait + return wrapper + return decorator + +class BaseComputeNodeDriver(object): + """Abstract base class for compute node drivers. + + libcloud abstracts away many of the differences between cloud providers, + but managing compute nodes requires some cloud-specific features (e.g., + on EC2 we use tags to identify compute nodes). Compute node drivers + are responsible for translating the node manager's cloud requests to a + specific cloud's vocabulary. + + Subclasses must implement arvados_create_kwargs (to update node + creation kwargs with information about the specific Arvados node + record), sync_node, and node_start_time. + """ + def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class): + self.real = driver_class(**auth_kwargs) + self.list_kwargs = list_kwargs + self.create_kwargs = create_kwargs + + def __getattr__(self, name): + # Proxy non-extension methods to the real driver. + if (not name.startswith('_') and not name.startswith('ex_') + and hasattr(self.real, name)): + return getattr(self.real, name) + else: + return super(BaseComputeNodeDriver, self).__getattr__(name) + + def search_for(self, term, list_method, key=lambda item: item.id): + cache_key = (list_method, term) + if cache_key not in self.SEARCH_CACHE: + results = [item for item in getattr(self.real, list_method)() + if key(item) == term] + count = len(results) + if count != 1: + raise ValueError("{} returned {} results for '{}'".format( + list_method, count, term)) + self.SEARCH_CACHE[cache_key] = results[0] + return self.SEARCH_CACHE[cache_key] + + def list_nodes(self): + return self.real.list_nodes(**self.list_kwargs) + + def arvados_create_kwargs(self, arvados_node): + raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs") + + def create_node(self, size, arvados_node): + kwargs = self.create_kwargs.copy() + kwargs.update(self.arvados_create_kwargs(arvados_node)) + kwargs['size'] = size + return self.real.create_node(**kwargs) + + def sync_node(self, cloud_node, arvados_node): + # When a compute node first pings the API server, the API server + # will automatically assign some attributes on the corresponding + # node record, like hostname. This method should propagate that + # information back to the cloud node appropriately. + raise NotImplementedError("BaseComputeNodeDriver.sync_node") + + @classmethod + def node_start_time(cls, node): + raise NotImplementedError("BaseComputeNodeDriver.node_start_time") + + +ComputeNodeDriverClass = BaseComputeNodeDriver + +class ComputeNodeSetupActor(config.actor_class): + """Actor to create and set up a cloud compute node. + + This actor prepares an Arvados node record for a new compute node + (either creating one or cleaning one passed in), then boots the + actual compute node. It notifies subscribers when the cloud node + is successfully created (the last step in the process for Node + Manager to handle). + """ + def __init__(self, timer_actor, arvados_client, cloud_client, + cloud_size, arvados_node=None, + retry_wait=1, max_retry_wait=180): + super(ComputeNodeSetupActor, self).__init__() + self._timer = timer_actor + self._arvados = arvados_client + self._cloud = cloud_client + self._later = self.actor_ref.proxy() + self._logger = logging.getLogger('arvnodeman.nodeup') + self.cloud_size = cloud_size + self.subscribers = set() + self.min_retry_wait = retry_wait + self.max_retry_wait = max_retry_wait + self.retry_wait = retry_wait + self.arvados_node = None + self.cloud_node = None + if arvados_node is None: + self._later.create_arvados_node() + else: + self._later.prepare_arvados_node(arvados_node) + + @_retry(config.ARVADOS_ERRORS) + def create_arvados_node(self): + self.arvados_node = self._arvados.nodes().create(body={}).execute() + self._later.create_cloud_node() + + @_retry(config.ARVADOS_ERRORS) + def prepare_arvados_node(self, node): + self.arvados_node = self._arvados.nodes().update( + uuid=node['uuid'], + body={'hostname': None, + 'ip_address': None, + 'slot_number': None, + 'first_ping_at': None, + 'last_ping_at': None, + 'info': {'ec2_instance_id': None, + 'last_action': "Prepared by Node Manager"}} + ).execute() + self._later.create_cloud_node() + + @_retry(config.CLOUD_ERRORS) + def create_cloud_node(self): + self._logger.info("Creating cloud node with size %s.", + self.cloud_size.name) + self.cloud_node = self._cloud.create_node(self.cloud_size, + self.arvados_node) + self._logger.info("Cloud node %s created.", self.cloud_node.id) + _notify_subscribers(self._later, self.subscribers) + self.subscribers = None + + def stop_if_no_cloud_node(self): + if self.cloud_node is None: + self.stop() + + def subscribe(self, subscriber): + if self.subscribers is None: + try: + subscriber(self._later) + except pykka.ActorDeadError: + pass + else: + self.subscribers.add(subscriber) + + +class ComputeNodeShutdownActor(config.actor_class): + """Actor to shut down a compute node. + + This actor simply destroys a cloud node, retrying as needed. + """ + def __init__(self, timer_actor, cloud_client, cloud_node, + retry_wait=1, max_retry_wait=180): + super(ComputeNodeShutdownActor, self).__init__() + self._timer = timer_actor + self._cloud = cloud_client + self._later = self.actor_ref.proxy() + self._logger = logging.getLogger('arvnodeman.nodedown') + self.cloud_node = cloud_node + self.min_retry_wait = retry_wait + self.max_retry_wait = max_retry_wait + self.retry_wait = retry_wait + self._later.shutdown_node() + + @_retry(config.CLOUD_ERRORS) + def shutdown_node(self): + self._cloud.destroy_node(self.cloud_node) + self._logger.info("Cloud node %s shut down.", self.cloud_node.id) + + +class ComputeNodeUpdateActor(config.actor_class): + """Actor to dispatch one-off cloud management requests. + + This actor receives requests for small cloud updates, and + dispatches them to a real driver. ComputeNodeMonitorActors use + this to perform maintenance tasks on themselves. Having a + dedicated actor for this gives us the opportunity to control the + flow of requests; e.g., by backing off when errors occur. + + This actor is most like a "traditional" Pykka actor: there's no + subscribing, but instead methods return real driver results. If + you're interested in those results, you should get them from the + Future that the proxy method returns. Be prepared to handle exceptions + from the cloud driver when you do. + """ + def __init__(self, cloud_factory, max_retry_wait=180): + super(ComputeNodeUpdateActor, self).__init__() + self._cloud = cloud_factory() + self.max_retry_wait = max_retry_wait + self.error_streak = 0 + self.next_request_time = time.time() + + def _throttle_errors(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + throttle_time = self.next_request_time - time.time() + if throttle_time > 0: + time.sleep(throttle_time) + self.next_request_time = time.time() + try: + result = orig_func(self, *args, **kwargs) + except config.CLOUD_ERRORS: + self.error_streak += 1 + self.next_request_time += min(2 ** self.error_streak, + self.max_retry_wait) + raise + else: + self.error_streak = 0 + return result + return wrapper + + @_throttle_errors + def sync_node(self, cloud_node, arvados_node): + return self._cloud.sync_node(cloud_node, arvados_node) + + +class ShutdownTimer(object): + """Keep track of a cloud node's shutdown windows. + + Instantiate this class with a timestamp of when a cloud node started, + and a list of durations (in minutes) of when the node must not and may + be shut down, alternating. The class will tell you when a shutdown + window is open, and when the next open window will start. + """ + def __init__(self, start_time, shutdown_windows): + # The implementation is easiest if we have an even number of windows, + # because then windows always alternate between open and closed. + # Rig that up: calculate the first shutdown window based on what's + # passed in. Then, if we were given an odd number of windows, merge + # that first window into the last one, since they both# represent + # closed state. + first_window = shutdown_windows[0] + shutdown_windows = list(shutdown_windows[1:]) + self._next_opening = start_time + (60 * first_window) + if len(shutdown_windows) % 2: + shutdown_windows.append(first_window) + else: + shutdown_windows[-1] += first_window + self.shutdown_windows = itertools.cycle([60 * n + for n in shutdown_windows]) + self._open_start = self._next_opening + self._open_for = next(self.shutdown_windows) + + def _advance_opening(self): + while self._next_opening < time.time(): + self._open_start = self._next_opening + self._next_opening += self._open_for + next(self.shutdown_windows) + self._open_for = next(self.shutdown_windows) + + def next_opening(self): + self._advance_opening() + return self._next_opening + + def window_open(self): + self._advance_opening() + return 0 < (time.time() - self._open_start) < self._open_for + + +class ComputeNodeMonitorActor(config.actor_class): + """Actor to manage a running compute node. + + This actor gets updates about a compute node's cloud and Arvados records. + It uses this information to notify subscribers when the node is eligible + for shutdown. + """ + def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer, + timer_actor, update_actor, arvados_node=None, + poll_stale_after=600, node_stale_after=3600): + super(ComputeNodeMonitorActor, self).__init__() + self._later = self.actor_ref.proxy() + self._logger = logging.getLogger('arvnodeman.computenode') + self._last_log = None + self._shutdowns = shutdown_timer + self._timer = timer_actor + self._update = update_actor + self.cloud_node = cloud_node + self.cloud_node_start_time = cloud_node_start_time + self.poll_stale_after = poll_stale_after + self.node_stale_after = node_stale_after + self.subscribers = set() + self.arvados_node = None + self._later.update_arvados_node(arvados_node) + self.last_shutdown_opening = None + self._later.consider_shutdown() + + def subscribe(self, subscriber): + self.subscribers.add(subscriber) + + def _debug(self, msg, *args): + if msg == self._last_log: + return + self._last_log = msg + self._logger.debug(msg, *args) + + def _shutdown_eligible(self): + if self.arvados_node is None: + return timestamp_fresh(self.cloud_node_start_time, + self.node_stale_after) + else: + return (timestamp_fresh(arvados_node_mtime(self.arvados_node), + self.poll_stale_after) and + (self.arvados_node['info'].get('slurm_state') == 'idle')) + + def consider_shutdown(self): + next_opening = self._shutdowns.next_opening() + if self._shutdowns.window_open(): + if self._shutdown_eligible(): + self._debug("Node %s suggesting shutdown.", self.cloud_node.id) + _notify_subscribers(self._later, self.subscribers) + else: + self._debug("Node %s shutdown window open but node busy.", + self.cloud_node.id) + else: + self._debug("Node %s shutdown window closed. Next at %s.", + self.cloud_node.id, time.ctime(next_opening)) + if self.last_shutdown_opening != next_opening: + self._timer.schedule(next_opening, self._later.consider_shutdown) + self.last_shutdown_opening = next_opening + + def offer_arvados_pair(self, arvados_node): + if self.arvados_node is not None: + return None + elif arvados_node['ip_address'] in self.cloud_node.private_ips: + self._later.update_arvados_node(arvados_node) + return self.cloud_node.id + else: + return None + + def update_cloud_node(self, cloud_node): + if cloud_node is not None: + self.cloud_node = cloud_node + self._later.consider_shutdown() + + def update_arvados_node(self, arvados_node): + if arvados_node is not None: + self.arvados_node = arvados_node + new_hostname = arvados_node_fqdn(self.arvados_node) + if new_hostname != self.cloud_node.name: + self._update.sync_node(self.cloud_node, self.arvados_node) + self._later.consider_shutdown() diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/dummy.py new file mode 100644 index 0000000000..6c39feaf61 --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/dummy.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time + +import libcloud.compute.providers as cloud_provider +import libcloud.compute.types as cloud_types + +from . import BaseComputeNodeDriver, arvados_node_fqdn + +class ComputeNodeDriver(BaseComputeNodeDriver): + """Compute node driver wrapper for libcloud's dummy driver. + + This class provides the glue necessary to run the node manager with a + dummy cloud. It's useful for testing. + """ + DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.DUMMY) + DEFAULT_REAL = DEFAULT_DRIVER('ComputeNodeDriver') + DUMMY_START_TIME = time.time() + + def __init__(self, auth_kwargs, list_kwargs, create_kwargs, + driver_class=DEFAULT_DRIVER): + super(ComputeNodeDriver, self).__init__( + auth_kwargs, list_kwargs, create_kwargs, driver_class) + if driver_class is self.DEFAULT_DRIVER: + self.real = self.DEFAULT_REAL + + def _ensure_private_ip(self, node): + if not node.private_ips: + node.private_ips = ['10.10.0.{}'.format(node.id)] + + def arvados_create_kwargs(self, arvados_node): + return {} + + def list_nodes(self): + nodelist = super(ComputeNodeDriver, self).list_nodes() + for node in nodelist: + self._ensure_private_ip(node) + return nodelist + + def create_node(self, size, arvados_node): + node = super(ComputeNodeDriver, self).create_node(size, arvados_node) + self._ensure_private_ip(node) + return node + + def sync_node(self, cloud_node, arvados_node): + cloud_node.name = arvados_node_fqdn(arvados_node) + + @classmethod + def node_start_time(cls, node): + return cls.DUMMY_START_TIME diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/ec2.py new file mode 100644 index 0000000000..359bed4d1c --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/ec2.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time + +import libcloud.compute.base as cloud_base +import libcloud.compute.providers as cloud_provider +import libcloud.compute.types as cloud_types +from libcloud.compute.drivers import ec2 as cloud_ec2 + +from . import BaseComputeNodeDriver, arvados_node_fqdn + +### Monkeypatch libcloud to support AWS' new SecurityGroup API. +# These classes can be removed when libcloud support specifying +# security groups with the SecurityGroupId parameter. +class ANMEC2Connection(cloud_ec2.EC2Connection): + def request(self, *args, **kwargs): + params = kwargs.get('params') + if (params is not None) and (params.get('Action') == 'RunInstances'): + for key in params.keys(): + if key.startswith('SecurityGroup.'): + new_key = key.replace('Group.', 'GroupId.', 1) + params[new_key] = params.pop(key).id + kwargs['params'] = params + return super(ANMEC2Connection, self).request(*args, **kwargs) + + +class ANMEC2NodeDriver(cloud_ec2.EC2NodeDriver): + connectionCls = ANMEC2Connection + + +class ComputeNodeDriver(BaseComputeNodeDriver): + """Compute node driver wrapper for EC2. + + This translates cloud driver requests to EC2's specific parameters. + """ + DEFAULT_DRIVER = ANMEC2NodeDriver +### End monkeypatch + SEARCH_CACHE = {} + + def __init__(self, auth_kwargs, list_kwargs, create_kwargs, + driver_class=DEFAULT_DRIVER): + # We need full lists of keys up front because these loops modify + # dictionaries in-place. + for key in list_kwargs.keys(): + list_kwargs[key.replace('_', ':')] = list_kwargs.pop(key) + self.tags = {key[4:]: value + for key, value in list_kwargs.iteritems() + if key.startswith('tag:')} + super(ComputeNodeDriver, self).__init__( + auth_kwargs, {'ex_filters': list_kwargs}, create_kwargs, + driver_class) + for key in self.create_kwargs.keys(): + init_method = getattr(self, '_init_' + key, None) + if init_method is not None: + new_pair = init_method(self.create_kwargs.pop(key)) + if new_pair is not None: + self.create_kwargs[new_pair[0]] = new_pair[1] + + def _init_image_id(self, image_id): + return 'image', self.search_for(image_id, 'list_images') + + def _init_ping_host(self, ping_host): + self.ping_host = ping_host + + def _init_security_groups(self, group_names): + return 'ex_security_groups', [ + self.search_for(gname.strip(), 'ex_get_security_groups') + for gname in group_names.split(',')] + + def _init_subnet_id(self, subnet_id): + return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets') + + def _init_ssh_key(self, filename): + with open(filename) as ssh_file: + key = cloud_base.NodeAuthSSHKey(ssh_file.read()) + return 'auth', key + + def arvados_create_kwargs(self, arvados_node): + result = {'ex_metadata': self.tags.copy(), + 'name': arvados_node_fqdn(arvados_node)} + ping_secret = arvados_node['info'].get('ping_secret') + if ping_secret is not None: + ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'. + format(self.ping_host, arvados_node['uuid'], + ping_secret)) + result['ex_userdata'] = ping_url + return result + + def sync_node(self, cloud_node, arvados_node): + metadata = self.arvados_create_kwargs(arvados_node) + tags = metadata['ex_metadata'] + tags['Name'] = metadata['name'] + self.real.ex_create_tags(cloud_node, tags) + + @classmethod + def node_start_time(cls, node): + time_str = node.extra['launch_time'].split('.', 2)[0] + 'UTC' + return time.mktime(time.strptime( + time_str,'%Y-%m-%dT%H:%M:%S%Z')) - time.timezone diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py new file mode 100644 index 0000000000..07504e2fe9 --- /dev/null +++ b/services/nodemanager/arvnodeman/config.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import ConfigParser +import importlib +import logging +import ssl + +import apiclient.errors as apierror +import arvados +import httplib2 +import libcloud.common.types as cloud_types +import pykka + +# IOError is the base class for socket.error and friends. +# It seems like it hits the sweet spot for operations we want to retry: +# it's low-level, but unlikely to catch code bugs. +NETWORK_ERRORS = (IOError, ssl.SSLError) +ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,) +CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,) + +actor_class = pykka.ThreadingActor + +class NodeManagerConfig(ConfigParser.SafeConfigParser): + """Node Manager Configuration class. + + This a standard Python ConfigParser, with additional helper methods to + create objects instantiated with configuration information. + """ + + LOGGING_NONLEVELS = frozenset(['file']) + + def __init__(self, *args, **kwargs): + # Can't use super() because SafeConfigParser is an old-style class. + ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs) + for sec_name, settings in { + 'Arvados': {'insecure': 'no', + 'timeout': '15'}, + 'Daemon': {'max_nodes': '1', + 'poll_time': '60', + 'max_poll_time': '300', + 'poll_stale_after': '600', + 'node_stale_after': str(60 * 60 * 2)}, + 'Logging': {'file': '/dev/stderr', + 'level': 'WARNING'}, + }.iteritems(): + if not self.has_section(sec_name): + self.add_section(sec_name) + for opt_name, value in settings.iteritems(): + if not self.has_option(sec_name, opt_name): + self.set(sec_name, opt_name, value) + + def get_section(self, section, transformer=None): + result = self._dict() + for key, value in self.items(section): + if transformer is not None: + try: + value = transformer(value) + except (TypeError, ValueError): + pass + result[key] = value + return result + + def log_levels(self): + return {key: getattr(logging, self.get('Logging', key).upper()) + for key in self.options('Logging') + if key not in self.LOGGING_NONLEVELS} + + def new_arvados_client(self): + if self.has_option('Daemon', 'certs_file'): + certs_file = self.get('Daemon', 'certs_file') + else: + certs_file = None + insecure = self.getboolean('Arvados', 'insecure') + http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'), + ca_certs=certs_file, + disable_ssl_certificate_validation=insecure) + return arvados.api('v1', + cache=False, # Don't reuse an existing client. + host=self.get('Arvados', 'host'), + token=self.get('Arvados', 'token'), + insecure=insecure, + http=http) + + def new_cloud_client(self): + module = importlib.import_module('arvnodeman.computenode.' + + self.get('Cloud', 'provider')) + auth_kwargs = self.get_section('Cloud Credentials') + if 'timeout' in auth_kwargs: + auth_kwargs['timeout'] = int(auth_kwargs['timeout']) + return module.ComputeNodeDriver(auth_kwargs, + self.get_section('Cloud List'), + self.get_section('Cloud Create')) + + def node_sizes(self, all_sizes): + size_kwargs = {} + for sec_name in self.sections(): + sec_words = sec_name.split(None, 2) + if sec_words[0] != 'Size': + continue + size_kwargs[sec_words[1]] = self.get_section(sec_name, int) + return [(size, size_kwargs[size.id]) for size in all_sizes + if size.id in size_kwargs] + + def shutdown_windows(self): + return [int(n) + for n in self.get('Cloud', 'shutdown_windows').split(',')] diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py new file mode 100644 index 0000000000..5b7437f8d3 --- /dev/null +++ b/services/nodemanager/arvnodeman/daemon.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import functools +import logging +import time + +import pykka + +from . import computenode as cnode +from .config import actor_class + +class _ComputeNodeRecord(object): + def __init__(self, actor=None, cloud_node=None, arvados_node=None, + assignment_time=float('-inf')): + self.actor = actor + self.cloud_node = cloud_node + self.arvados_node = arvados_node + self.assignment_time = assignment_time + + +class _BaseNodeTracker(object): + def __init__(self): + self.nodes = {} + self.orphans = {} + + def __getitem__(self, key): + return self.nodes[key] + + def __len__(self): + return len(self.nodes) + + def get(self, key, default=None): + return self.nodes.get(key, default) + + def record_key(self, record): + return self.item_key(getattr(record, self.RECORD_ATTR)) + + def add(self, record): + self.nodes[self.record_key(record)] = record + + def update_record(self, key, item): + setattr(self.nodes[key], self.RECORD_ATTR, item) + + def update_from(self, response): + unseen = set(self.nodes.iterkeys()) + for item in response: + key = self.item_key(item) + if key in unseen: + unseen.remove(key) + self.update_record(key, item) + else: + yield key, item + self.orphans = {key: self.nodes.pop(key) for key in unseen} + + def unpaired(self): + return (record for record in self.nodes.itervalues() + if getattr(record, self.PAIR_ATTR) is None) + + +class _CloudNodeTracker(_BaseNodeTracker): + RECORD_ATTR = 'cloud_node' + PAIR_ATTR = 'arvados_node' + item_key = staticmethod(lambda cloud_node: cloud_node.id) + + +class _ArvadosNodeTracker(_BaseNodeTracker): + RECORD_ATTR = 'arvados_node' + PAIR_ATTR = 'cloud_node' + item_key = staticmethod(lambda arvados_node: arvados_node['uuid']) + + def find_stale_node(self, stale_time): + for record in self.nodes.itervalues(): + node = record.arvados_node + if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node), + stale_time) and + not cnode.timestamp_fresh(record.assignment_time, + stale_time)): + return node + return None + + +class NodeManagerDaemonActor(actor_class): + """Node Manager daemon. + + This actor subscribes to all information polls about cloud nodes, + Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor + for every cloud node, subscribing them to poll updates + appropriately. It creates and destroys cloud nodes based on job queue + demand, and stops the corresponding ComputeNode actors when their work + is done. + """ + def __init__(self, server_wishlist_actor, arvados_nodes_actor, + cloud_nodes_actor, cloud_update_actor, timer_actor, + arvados_factory, cloud_factory, + shutdown_windows, max_nodes, + poll_stale_after=600, node_stale_after=7200, + node_setup_class=cnode.ComputeNodeSetupActor, + node_shutdown_class=cnode.ComputeNodeShutdownActor, + node_actor_class=cnode.ComputeNodeMonitorActor): + super(NodeManagerDaemonActor, self).__init__() + self._node_setup = node_setup_class + self._node_shutdown = node_shutdown_class + self._node_actor = node_actor_class + self._cloud_updater = cloud_update_actor + self._timer = timer_actor + self._new_arvados = arvados_factory + self._new_cloud = cloud_factory + self._cloud_driver = self._new_cloud() + self._logger = logging.getLogger('arvnodeman.daemon') + self._later = self.actor_ref.proxy() + self.shutdown_windows = shutdown_windows + self.max_nodes = max_nodes + self.poll_stale_after = poll_stale_after + self.node_stale_after = node_stale_after + self.last_polls = {} + for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']: + poll_actor = locals()[poll_name + '_actor'] + poll_actor.subscribe(getattr(self._later, 'update_' + poll_name)) + setattr(self, '_{}_actor'.format(poll_name), poll_actor) + self.last_polls[poll_name] = -self.poll_stale_after + self.cloud_nodes = _CloudNodeTracker() + self.arvados_nodes = _ArvadosNodeTracker() + self.booting = {} # Actor IDs to ComputeNodeSetupActors + self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors + self._logger.debug("Daemon initialized") + + def _update_poll_time(self, poll_key): + self.last_polls[poll_key] = time.time() + + def _pair_nodes(self, node_record, arvados_node): + self._logger.info("Cloud node %s has associated with Arvados node %s", + node_record.cloud_node.id, arvados_node['uuid']) + self._arvados_nodes_actor.subscribe_to( + arvados_node['uuid'], node_record.actor.update_arvados_node) + node_record.arvados_node = arvados_node + self.arvados_nodes.add(node_record) + + def _new_node(self, cloud_node): + start_time = self._cloud_driver.node_start_time(cloud_node) + shutdown_timer = cnode.ShutdownTimer(start_time, + self.shutdown_windows) + actor = self._node_actor.start( + cloud_node=cloud_node, + cloud_node_start_time=start_time, + shutdown_timer=shutdown_timer, + update_actor=self._cloud_updater, + timer_actor=self._timer, + arvados_node=None, + poll_stale_after=self.poll_stale_after, + node_stale_after=self.node_stale_after).proxy() + actor.subscribe(self._later.node_can_shutdown) + self._cloud_nodes_actor.subscribe_to(cloud_node.id, + actor.update_cloud_node) + record = _ComputeNodeRecord(actor, cloud_node) + self.cloud_nodes.add(record) + return record + + def update_cloud_nodes(self, nodelist): + self._update_poll_time('cloud_nodes') + for key, node in self.cloud_nodes.update_from(nodelist): + self._logger.info("Registering new cloud node %s", key) + record = self._new_node(node) + for arv_rec in self.arvados_nodes.unpaired(): + if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): + self._pair_nodes(record, arv_rec.arvados_node) + break + for key, record in self.cloud_nodes.orphans.iteritems(): + record.actor.stop() + if key in self.shutdowns: + self.shutdowns.pop(key).stop() + + def update_arvados_nodes(self, nodelist): + self._update_poll_time('arvados_nodes') + for key, node in self.arvados_nodes.update_from(nodelist): + self._logger.info("Registering new Arvados node %s", key) + record = _ComputeNodeRecord(arvados_node=node) + self.arvados_nodes.add(record) + for arv_rec in self.arvados_nodes.unpaired(): + arv_node = arv_rec.arvados_node + for cloud_rec in self.cloud_nodes.unpaired(): + if cloud_rec.actor.offer_arvados_pair(arv_node).get(): + self._pair_nodes(cloud_rec, arv_node) + break + + def _node_count(self): + up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting]) + return up - len(self.shutdowns) + + def _nodes_wanted(self): + return len(self.last_wishlist) - self._node_count() + + def _nodes_excess(self): + return -self._nodes_wanted() + + def update_server_wishlist(self, wishlist): + self._update_poll_time('server_wishlist') + self.last_wishlist = wishlist[:self.max_nodes] + nodes_wanted = self._nodes_wanted() + if nodes_wanted > 0: + self._later.start_node() + elif (nodes_wanted < 0) and self.booting: + self._later.stop_booting_node() + + def _check_poll_freshness(orig_func): + """Decorator to inhibit a method when poll information is stale. + + This decorator checks the timestamps of all the poll information the + daemon has received. The decorated method is only called if none + of the timestamps are considered stale. + """ + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + now = time.time() + if all(now - t < self.poll_stale_after + for t in self.last_polls.itervalues()): + return orig_func(self, *args, **kwargs) + else: + return None + return wrapper + + @_check_poll_freshness + def start_node(self): + nodes_wanted = self._nodes_wanted() + if nodes_wanted < 1: + return None + arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) + cloud_size = self.last_wishlist[nodes_wanted - 1] + self._logger.info("Want %s more nodes. Booting a %s node.", + nodes_wanted, cloud_size.name) + new_setup = self._node_setup.start( + timer_actor=self._timer, + arvados_client=self._new_arvados(), + arvados_node=arvados_node, + cloud_client=self._new_cloud(), + cloud_size=cloud_size).proxy() + self.booting[new_setup.actor_ref.actor_urn] = new_setup + if arvados_node is not None: + self.arvados_nodes[arvados_node['uuid']].assignment_time = ( + time.time()) + new_setup.subscribe(self._later.node_up) + if nodes_wanted > 1: + self._later.start_node() + + def _actor_nodes(self, node_actor): + return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node]) + + def node_up(self, setup_proxy): + cloud_node, arvados_node = self._actor_nodes(setup_proxy) + del self.booting[setup_proxy.actor_ref.actor_urn] + setup_proxy.stop() + record = self.cloud_nodes.get(cloud_node.id) + if record is None: + record = self._new_node(cloud_node) + self._pair_nodes(record, arvados_node) + + @_check_poll_freshness + def stop_booting_node(self): + nodes_excess = self._nodes_excess() + if (nodes_excess < 1) or not self.booting: + return None + for key, node in self.booting.iteritems(): + node.stop_if_no_cloud_node().get() + if not node.actor_ref.is_alive(): + del self.booting[key] + if nodes_excess > 1: + self._later.stop_booting_node() + break + + @_check_poll_freshness + def node_can_shutdown(self, node_actor): + if self._nodes_excess() < 1: + return None + cloud_node, arvados_node = self._actor_nodes(node_actor) + if cloud_node.id in self.shutdowns: + return None + shutdown = self._node_shutdown.start(timer_actor=self._timer, + cloud_client=self._new_cloud(), + cloud_node=cloud_node).proxy() + self.shutdowns[cloud_node.id] = shutdown + + def shutdown(self): + self._logger.info("Shutting down after signal.") + self.poll_stale_after = -1 # Inhibit starting/stopping nodes + for bootnode in self.booting.itervalues(): + bootnode.stop_if_no_cloud_node() + self._later.await_shutdown() + + def await_shutdown(self): + if any(node.actor_ref.is_alive() for node in self.booting.itervalues()): + self._timer.schedule(time.time() + 1, self._later.await_shutdown) + else: + self.stop() diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py new file mode 100644 index 0000000000..08ee12e1ad --- /dev/null +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +from . import clientactor +from .config import ARVADOS_ERRORS + +class ServerCalculator(object): + """Generate cloud server wishlists from an Arvados job queue. + + Instantiate this class with a list of cloud node sizes you're willing to + use, plus keyword overrides from the configuration. Then you can pass + job queues to servers_for_queue. It will return a list of node sizes + that would best satisfy the jobs, choosing the cheapest size that + satisfies each job, and ignoring jobs that can't be satisfied. + """ + + class CloudSizeWrapper(object): + def __init__(self, real_size, **kwargs): + self.real = real_size + for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price', + 'extra']: + setattr(self, name, getattr(self.real, name)) + self.cores = kwargs.pop('cores') + self.scratch = self.disk + for name, override in kwargs.iteritems(): + if not hasattr(self, name): + raise ValueError("unrecognized size field '%s'" % (name,)) + setattr(self, name, override) + + def meets_constraints(self, **kwargs): + for name, want_value in kwargs.iteritems(): + have_value = getattr(self, name) + if (have_value != 0) and (have_value < want_value): + return False + return True + + + def __init__(self, server_list, max_nodes=None): + self.cloud_sizes = [self.CloudSizeWrapper(s, **kws) + for s, kws in server_list] + self.cloud_sizes.sort(key=lambda s: s.price) + self.max_nodes = max_nodes or float("inf") + + @staticmethod + def coerce_int(x, fallback): + try: + return int(x) + except (TypeError, ValueError): + return fallback + + def cloud_size_for_constraints(self, constraints): + want_value = lambda key: self.coerce_int(constraints.get(key), 0) + wants = {'cores': want_value('min_cores_per_node'), + 'ram': want_value('min_ram_mb_per_node'), + 'scratch': want_value('min_scratch_mb_per_node')} + for size in self.cloud_sizes: + if size.meets_constraints(**wants): + return size + return None + + def servers_for_queue(self, queue): + servers = [] + for job in queue: + constraints = job['runtime_constraints'] + want_count = self.coerce_int(constraints.get('min_nodes'), 1) + cloud_size = self.cloud_size_for_constraints(constraints) + if (want_count < self.max_nodes) and (cloud_size is not None): + servers.extend([cloud_size.real] * max(1, want_count)) + return servers + + +class JobQueueMonitorActor(clientactor.RemotePollLoopActor): + """Actor to generate server wishlists from the job queue. + + This actor regularly polls Arvados' job queue, and uses the provided + ServerCalculator to turn that into a list of requested node sizes. That + list is sent to subscribers on every poll. + """ + + CLIENT_ERRORS = ARVADOS_ERRORS + LOGGER_NAME = 'arvnodeman.jobqueue' + + def __init__(self, client, timer_actor, server_calc, *args, **kwargs): + super(JobQueueMonitorActor, self).__init__( + client, timer_actor, *args, **kwargs) + self._calculator = server_calc + + def _send_request(self): + return self._client.jobs().queue().execute()['items'] + + def _got_response(self, queue): + server_list = self._calculator.servers_for_queue(queue) + self._logger.debug("Sending server wishlist: %s", + ', '.join(s.name for s in server_list)) + return super(JobQueueMonitorActor, self)._got_response(server_list) diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py new file mode 100644 index 0000000000..87f2dda916 --- /dev/null +++ b/services/nodemanager/arvnodeman/launcher.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import argparse +import logging +import signal +import sys +import time + +import daemon +import pykka + +from . import config as nmconfig +from .computenode import \ + ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \ + ShutdownTimer +from .daemon import NodeManagerDaemonActor +from .jobqueue import JobQueueMonitorActor, ServerCalculator +from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor +from .timedcallback import TimedCallBackActor + +node_daemon = None + +def abort(msg, code=1): + print("arvados-node-manager: " + msg) + sys.exit(code) + +def parse_cli(args): + parser = argparse.ArgumentParser( + prog='arvados-node-manager', + description="Dynamically allocate Arvados cloud compute nodes") + parser.add_argument( + '--foreground', action='store_true', default=False, + help="Run in the foreground. Don't daemonize.") + parser.add_argument( + '--config', help="Path to configuration file") + return parser.parse_args(args) + +def load_config(path): + if not path: + abort("No --config file specified", 2) + config = nmconfig.NodeManagerConfig() + try: + with open(path) as config_file: + config.readfp(config_file) + except (IOError, OSError) as error: + abort("Error reading configuration file {}: {}".format(path, error)) + return config + +def setup_logging(path, level, **sublevels): + handler = logging.FileHandler(path) + handler.setFormatter(logging.Formatter( + '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s', + '%Y-%m-%d %H:%M:%S')) + root_logger = logging.getLogger() + root_logger.addHandler(handler) + root_logger.setLevel(level) + for logger_name, sublevel in sublevels.iteritems(): + sublogger = logging.getLogger(logger_name) + sublogger.setLevel(sublevel) + +def launch_pollers(config): + cloud_client = config.new_cloud_client() + arvados_client = config.new_arvados_client() + cloud_size_list = config.node_sizes(cloud_client.list_sizes()) + if not cloud_size_list: + abort("No valid node sizes configured") + + server_calculator = ServerCalculator( + cloud_size_list, config.getint('Daemon', 'max_nodes')) + poll_time = config.getint('Daemon', 'poll_time') + max_poll_time = config.getint('Daemon', 'max_poll_time') + + timer = TimedCallBackActor.start(poll_time / 10.0).proxy() + cloud_node_poller = CloudNodeListMonitorActor.start( + cloud_client, timer, poll_time, max_poll_time).proxy() + arvados_node_poller = ArvadosNodeListMonitorActor.start( + arvados_client, timer, poll_time, max_poll_time).proxy() + job_queue_poller = JobQueueMonitorActor.start( + config.new_arvados_client(), timer, server_calculator, + poll_time, max_poll_time).proxy() + return timer, cloud_node_poller, arvados_node_poller, job_queue_poller + +_caught_signals = {} +def shutdown_signal(signal_code, frame): + current_count = _caught_signals.get(signal_code, 0) + _caught_signals[signal_code] = current_count + 1 + if node_daemon is None: + pykka.ActorRegistry.stop_all() + sys.exit(-signal_code) + elif current_count == 0: + node_daemon.shutdown() + elif current_count == 1: + pykka.ActorRegistry.stop_all() + else: + sys.exit(-signal_code) + +def main(args=None): + global node_daemon + args = parse_cli(args) + config = load_config(args.config) + + if not args.foreground: + daemon.DaemonContext().open() + for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]: + signal.signal(sigcode, shutdown_signal) + + setup_logging(config.get('Logging', 'file'), **config.log_levels()) + timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \ + launch_pollers(config) + cloud_node_updater = ComputeNodeUpdateActor.start( + config.new_cloud_client).proxy() + node_daemon = NodeManagerDaemonActor.start( + job_queue_poller, arvados_node_poller, cloud_node_poller, + cloud_node_updater, timer, + config.new_arvados_client, config.new_cloud_client, + config.shutdown_windows(), config.getint('Daemon', 'max_nodes'), + config.getint('Daemon', 'poll_stale_after'), + config.getint('Daemon', 'node_stale_after')).proxy() + + signal.pause() + daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set + while not daemon_stopped(): + time.sleep(1) + pykka.ActorRegistry.stop_all() + + +if __name__ == '__main__': + main() diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py new file mode 100644 index 0000000000..7ddfb7ca33 --- /dev/null +++ b/services/nodemanager/arvnodeman/nodelist.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +from . import clientactor +from . import config + +class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor): + """Actor to poll the Arvados node list. + + This actor regularly polls the list of Arvados node records, and + sends it to subscribers. + """ + + CLIENT_ERRORS = config.ARVADOS_ERRORS + LOGGER_NAME = 'arvnodeman.arvados_nodes' + + def _item_key(self, node): + return node['uuid'] + + def _send_request(self): + return self._client.nodes().list(limit=10000).execute()['items'] + + +class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor): + """Actor to poll the cloud node list. + + This actor regularly polls the cloud to get a list of running compute + nodes, and sends it to subscribers. + """ + + CLIENT_ERRORS = config.CLOUD_ERRORS + LOGGER_NAME = 'arvnodeman.cloud_nodes' + + def _item_key(self, node): + return node.id + + def _send_request(self): + return self._client.list_nodes() diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py new file mode 100644 index 0000000000..a1df8ec17b --- /dev/null +++ b/services/nodemanager/arvnodeman/timedcallback.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import heapq +import time + +import pykka + +from .config import actor_class + +class TimedCallBackActor(actor_class): + """Send messages to other actors on a schedule. + + Other actors can call the schedule() method to schedule delivery of a + message at a later time. This actor runs the necessary event loop for + delivery. + """ + def __init__(self, max_sleep=1): + super(TimedCallBackActor, self).__init__() + self._proxy = self.actor_ref.proxy() + self.messages = [] + self.max_sleep = max_sleep + + def schedule(self, delivery_time, receiver, *args, **kwargs): + heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs)) + self._proxy.deliver() + + def deliver(self): + if not self.messages: + return None + til_next = self.messages[0][0] - time.time() + if til_next < 0: + t, receiver, args, kwargs = heapq.heappop(self.messages) + try: + receiver(*args, **kwargs) + except pykka.ActorDeadError: + pass + else: + time.sleep(min(til_next, self.max_sleep)) + self._proxy.deliver() diff --git a/services/nodemanager/bin/arvados-node-manager b/services/nodemanager/bin/arvados-node-manager new file mode 100644 index 0000000000..3a912887a2 --- /dev/null +++ b/services/nodemanager/bin/arvados-node-manager @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +from arvnodeman.launcher import main +main() diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg new file mode 100644 index 0000000000..a56e69eea6 --- /dev/null +++ b/services/nodemanager/doc/ec2.example.cfg @@ -0,0 +1,121 @@ +# EC2 configuration for Arvados Node Manager. +# All times are in seconds unless specified otherwise. + +[Daemon] +# Node Manager will not start any compute nodes when at least this +# many are running. +max_nodes = 8 + +# Poll EC2 nodes and Arvados for new information every N seconds. +poll_time = 60 + +# Polls have exponential backoff when services fail to respond. +# This is the longest time to wait between polls. +max_poll_time = 300 + +# If Node Manager can't succesfully poll a service for this long, +# it will never start or stop compute nodes, on the assumption that its +# information is too outdated. +poll_stale_after = 600 + +# "Node stale time" affects two related behaviors. +# 1. If a compute node has been running for at least this long, but it +# isn't paired with an Arvados node, do not shut it down, but leave it alone. +# This prevents the node manager from shutting down a node that might +# actually be doing work, but is having temporary trouble contacting the +# API server. +# 2. When the Node Manager starts a new compute node, it will try to reuse +# an Arvados node that hasn't been updated for this long. +node_stale_after = 14400 + +# File path for Certificate Authorities +certs_file = /etc/ssl/certs/ca-certificates.crt + +[Logging] +# Log file path +file = /var/log/arvados/node-manager.log + +# Log level for most Node Manager messages. +# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL. +# WARNING lets you know when polling a service fails. +# INFO additionally lets you know when a compute node is started or stopped. +level = INFO + +# You can also set different log levels for specific libraries. +# Pykka is the Node Manager's actor library. +# Setting this to DEBUG will display tracebacks for uncaught +# exceptions in the actors, but it's also very chatty. +pykka = WARNING + +# Setting apiclient to INFO will log the URL of every Arvados API request. +apiclient = WARNING + +[Arvados] +host = zyxwv.arvadosapi.com +token = ARVADOS_TOKEN +timeout = 15 + +# Accept an untrusted SSL certificate from the API server? +insecure = no + +[Cloud] +provider = ec2 + +# It's usually most cost-effective to shut down compute nodes during narrow +# windows of time. For example, EC2 bills each node by the hour, so the best +# time to shut down a node is right before a new hour of uptime starts. +# Shutdown windows define these periods of time. These are windows in +# full minutes, separated by commas. Counting from the time the node is +# booted, the node WILL NOT shut down for N1 minutes; then it MAY shut down +# for N2 minutes; then it WILL NOT shut down for N3 minutes; and so on. +# For example, "54, 5, 1" means the node may shut down from the 54th to the +# 59th minute of each hour of uptime. +# Specify at least two windows. You can add as many as you need beyond that. +shutdown_windows = 54, 5, 1 + +[Cloud Credentials] +key = KEY +secret = SECRET_KEY +region = us-east-1 +timeout = 60 + +[Cloud List] +# This section defines filters that find compute nodes. +# Tags that you specify here will automatically be added to nodes you create. +# Replace colons in Amazon filters with underscores +# (e.g., write "tag:mytag" as "tag_mytag"). +instance-state-name = running +tag_arvados-class = dynamic-compute +tag_cluster = zyxwv + +[Cloud Create] +# New compute nodes will send pings to Arvados at this host. +# You may specify a port, and use brackets to disambiguate IPv6 addresses. +ping_host = hostname:port + +# Give the name of an SSH key on AWS... +ex_keyname = string + +# ... or a file path for an SSH key that can log in to the compute node. +# (One or the other, not both.) +# ssh_key = path + +# The EC2 IDs of the image and subnet compute nodes should use. +image_id = idstring +subnet_id = idstring + +# Comma-separated EC2 IDs for the security group(s) assigned to each +# compute node. +security_groups = idstring1, idstring2 + +[Size t2.medium] +# You can define any number of Size sections to list EC2 sizes you're +# willing to use. The Node Manager should boot the cheapest size(s) that +# can run jobs in the queue (N.B.: defining more than one size has not been +# tested yet). +# Each size section MUST define the number of cores it has. You may also +# want to define the number of mebibytes of scratch space for Crunch jobs. +# You can also override Amazon's provided data fields by setting the same +# names here. +cores = 2 +scratch = 100 \ No newline at end of file diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg new file mode 100644 index 0000000000..8a6e626907 --- /dev/null +++ b/services/nodemanager/doc/local.example.cfg @@ -0,0 +1,41 @@ +# You can use this configuration to run a development Node Manager for +# testing. It uses libcloud's dummy driver and your own development API server. +# When new cloud nodes are created, you'll need to simulate the ping that +# they send to the Arvados API server. The easiest way I've found to do that +# is through the API server Rails console: load the Node object, set its +# IP address to 10.10.0.N (where N is the cloud node's ID), and save. + +[Daemon] +max_nodes = 8 +poll_time = 15 +max_poll_time = 60 +poll_stale_after = 600 +node_stale_after = 300 +certs_file = /etc/ssl/certs/ca-certificates.crt + +[Logging] +level = DEBUG +pykka = DEBUG +apiclient = WARNING + +[Arvados] +host = localhost:3030 +# This is the token for the text fixture's admin user. +token = 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h +insecure = yes +timeout = 15 + +[Cloud] +provider = dummy +shutdown_windows = 1, 1 +timeout = 15 + +[Cloud Credentials] +creds = dummycreds + +[Cloud List] +[Cloud Create] + +[Size 2] +cores = 4 +scratch = 1234 diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py new file mode 100644 index 0000000000..fabb883d42 --- /dev/null +++ b/services/nodemanager/setup.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +import os +import subprocess +import time + +from setuptools import setup, find_packages + +SETUP_DIR = os.path.dirname(__file__) or "." +cmd_opts = {'egg_info': {}} +try: + git_tags = subprocess.check_output( + ['git', 'log', '--first-parent', '--max-count=1', + '--format=format:%ct %h', SETUP_DIR]).split() + assert len(git_tags) == 2 +except (AssertionError, OSError, subprocess.CalledProcessError): + pass +else: + git_tags[0] = time.strftime('%Y%m%d%H%M%S', time.gmtime(int(git_tags[0]))) + cmd_opts['egg_info']['tag_build'] = '.{}.{}'.format(*git_tags) + +setup(name='arvados-node-manager', + version='0.1', + description='Arvados compute node manager', + author='Arvados', + author_email='info@arvados.org', + url="https://arvados.org", + license='GNU Affero General Public License, version 3.0', + packages=find_packages(), + install_requires=[ + 'apache-libcloud', + 'arvados-python-client', + 'pykka', + 'python-daemon', + ], + scripts=['bin/arvados-node-manager'], + test_suite='tests', + tests_require=['mock>=1.0'], + zip_safe=False, + options=cmd_opts, + ) diff --git a/services/nodemanager/tests/__init__.py b/services/nodemanager/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py new file mode 100644 index 0000000000..0db0a33e3d --- /dev/null +++ b/services/nodemanager/tests/test_clientactor.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import unittest + +import mock +import pykka + +import arvnodeman.clientactor as clientactor +from . import testutil + +class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin, + unittest.TestCase): + class MockClientError(Exception): + pass + + class TestActor(clientactor.RemotePollLoopActor): + LOGGER_NAME = 'arvnodeman.testpoll' + + def _send_request(self): + return self._client() + TestActor.CLIENT_ERRORS = (MockClientError,) + TEST_CLASS = TestActor + + + def build_monitor(self, side_effect, *args, **kwargs): + super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs) + self.client.side_effect = side_effect + + def test_poll_loop_starts_after_subscription(self): + self.build_monitor(['test1']) + self.monitor.subscribe(self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with('test1') + self.wait_for_call(self.timer.schedule) + + def test_poll_loop_continues_after_failure(self): + self.build_monitor(self.MockClientError) + self.monitor.subscribe(self.subscriber) + self.wait_for_call(self.timer.schedule) + self.assertTrue(self.monitor.actor_ref.is_alive(), + "poll loop died after error") + self.assertFalse(self.subscriber.called, + "poll loop notified subscribers after error") + + def test_late_subscribers_get_responses(self): + self.build_monitor(['late_test']) + self.monitor.subscribe(lambda response: None) + self.monitor.subscribe(self.subscriber) + self.monitor.poll() + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with('late_test') + + def test_survive_dead_subscriptions(self): + self.build_monitor(['survive1', 'survive2']) + dead_subscriber = mock.Mock(name='dead_subscriber') + dead_subscriber.side_effect = pykka.ActorDeadError + self.monitor.subscribe(dead_subscriber) + self.wait_for_call(dead_subscriber) + self.monitor.subscribe(self.subscriber) + self.monitor.poll() + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with('survive2') + self.assertTrue(self.monitor.actor_ref.is_alive(), + "poll loop died from dead subscriber") + + def test_no_subscriptions_by_key_without_support(self): + self.build_monitor([]) + with self.assertRaises(AttributeError): + self.monitor.subscribe_to('key') + + +class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin, + unittest.TestCase): + class TestActor(RemotePollLoopActorTestCase.TestActor): + def _item_key(self, item): + return item['key'] + TEST_CLASS = TestActor + + + def build_monitor(self, side_effect, *args, **kwargs): + super(RemotePollLoopActorWithKeysTestCase, self).build_monitor( + *args, **kwargs) + self.client.side_effect = side_effect + + def test_key_subscription(self): + self.build_monitor([[{'key': 1}, {'key': 2}]]) + self.monitor.subscribe_to(2, self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with({'key': 2}) + + def test_survive_dead_key_subscriptions(self): + item = {'key': 3} + self.build_monitor([[item], [item]]) + dead_subscriber = mock.Mock(name='dead_subscriber') + dead_subscriber.side_effect = pykka.ActorDeadError + self.monitor.subscribe_to(3, dead_subscriber) + self.wait_for_call(dead_subscriber) + self.monitor.subscribe_to(3, self.subscriber) + self.monitor.poll() + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with(item) + self.assertTrue(self.monitor.actor_ref.is_alive(), + "poll loop died from dead key subscriber") + + def test_mixed_subscriptions(self): + item = {'key': 4} + self.build_monitor([[item], [item]]) + key_subscriber = mock.Mock(name='key_subscriber') + self.monitor.subscribe(self.subscriber) + self.monitor.subscribe_to(4, key_subscriber) + self.monitor.poll() + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with([item]) + key_subscriber.assert_called_with(item) + + def test_subscription_to_missing_key(self): + self.build_monitor([[]]) + self.monitor.subscribe_to('nonesuch', self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with(None) + + +if __name__ == '__main__': + unittest.main() + diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py new file mode 100644 index 0000000000..2fc7a504f4 --- /dev/null +++ b/services/nodemanager/tests/test_computenode.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time +import unittest + +import arvados.errors as arverror +import httplib2 +import mock +import pykka + +import arvnodeman.computenode as cnode +from . import testutil + +class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase): + def make_mocks(self, arvados_effect=None, cloud_effect=None): + if arvados_effect is None: + arvados_effect = testutil.arvados_node_mock() + self.timer = testutil.MockTimer() + self.api_client = mock.MagicMock(name='api_client') + self.api_client.nodes().create().execute.side_effect = arvados_effect + self.api_client.nodes().update().execute.side_effect = arvados_effect + self.cloud_client = mock.MagicMock(name='cloud_client') + self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1) + + def make_actor(self, arv_node=None): + if not hasattr(self, 'timer'): + self.make_mocks() + self.setup_actor = cnode.ComputeNodeSetupActor.start( + self.timer, self.api_client, self.cloud_client, + testutil.MockSize(1), arv_node).proxy() + + def test_creation_without_arvados_node(self): + self.make_actor() + self.wait_for_call(self.api_client.nodes().create().execute) + self.wait_for_call(self.cloud_client.create_node) + + def test_creation_with_arvados_node(self): + arv_node = testutil.arvados_node_mock() + self.make_mocks([arv_node]) + self.make_actor(arv_node) + self.wait_for_call(self.api_client.nodes().update().execute) + self.wait_for_call(self.cloud_client.create_node) + + def test_failed_calls_retried(self): + self.make_mocks([ + arverror.ApiError(httplib2.Response({'status': '500'}), ""), + testutil.arvados_node_mock(), + ]) + self.make_actor() + self.wait_for_call(self.cloud_client.create_node) + + def test_stop_when_no_cloud_node(self): + self.make_mocks( + arverror.ApiError(httplib2.Response({'status': '500'}), "")) + self.make_actor() + self.wait_for_call(self.api_client.nodes().create().execute) + self.setup_actor.stop_if_no_cloud_node() + self.assertTrue( + self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT)) + + def test_no_stop_when_cloud_node(self): + self.make_actor() + self.wait_for_call(self.cloud_client.create_node) + self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT) + self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set()) + + def test_subscribe(self): + self.make_mocks( + arverror.ApiError(httplib2.Response({'status': '500'}), "")) + self.make_actor() + subscriber = mock.Mock(name='subscriber_mock') + self.setup_actor.subscribe(subscriber) + self.api_client.nodes().create().execute.side_effect = [ + testutil.arvados_node_mock()] + self.wait_for_call(subscriber) + self.assertEqual(self.setup_actor.actor_ref.actor_urn, + subscriber.call_args[0][0].actor_ref.actor_urn) + + def test_late_subscribe(self): + self.make_actor() + subscriber = mock.Mock(name='subscriber_mock') + self.wait_for_call(self.cloud_client.create_node) + self.setup_actor.subscribe(subscriber) + self.wait_for_call(subscriber) + self.assertEqual(self.setup_actor.actor_ref.actor_urn, + subscriber.call_args[0][0].actor_ref.actor_urn) + + +class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + def make_mocks(self, cloud_node=None): + self.timer = testutil.MockTimer() + self.cloud_client = mock.MagicMock(name='cloud_client') + if cloud_node is None: + cloud_node = testutil.cloud_node_mock() + self.cloud_node = cloud_node + + def make_actor(self, arv_node=None): + if not hasattr(self, 'timer'): + self.make_mocks() + self.shutdown_actor = cnode.ComputeNodeShutdownActor.start( + self.timer, self.cloud_client, self.cloud_node).proxy() + + def test_easy_shutdown(self): + self.make_actor() + self.wait_for_call(self.cloud_client.destroy_node) + + +class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + def make_actor(self): + self.driver = mock.MagicMock(name='driver_mock') + self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy() + + def test_node_sync(self): + self.make_actor() + cloud_node = testutil.cloud_node_mock() + arv_node = testutil.arvados_node_mock() + self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT) + self.driver().sync_node.assert_called_with(cloud_node, arv_node) + + +@mock.patch('time.time', return_value=1) +class ShutdownTimerTestCase(unittest.TestCase): + def test_two_length_window(self, time_mock): + timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2]) + self.assertEqual(481, timer.next_opening()) + self.assertFalse(timer.window_open()) + time_mock.return_value += 500 + self.assertEqual(1081, timer.next_opening()) + self.assertTrue(timer.window_open()) + time_mock.return_value += 200 + self.assertEqual(1081, timer.next_opening()) + self.assertFalse(timer.window_open()) + + def test_three_length_window(self, time_mock): + timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1]) + self.assertEqual(361, timer.next_opening()) + self.assertFalse(timer.window_open()) + time_mock.return_value += 400 + self.assertEqual(961, timer.next_opening()) + self.assertTrue(timer.window_open()) + time_mock.return_value += 200 + self.assertEqual(961, timer.next_opening()) + self.assertFalse(timer.window_open()) + + +class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + class MockShutdownTimer(object): + def _set_state(self, is_open, next_opening): + self.window_open = lambda: is_open + self.next_opening = lambda: next_opening + + + def make_mocks(self, node_num): + self.shutdowns = self.MockShutdownTimer() + self.shutdowns._set_state(False, 300) + self.timer = mock.MagicMock(name='timer_mock') + self.updates = mock.MagicMock(name='update_mock') + self.cloud_mock = testutil.cloud_node_mock(node_num) + self.subscriber = mock.Mock(name='subscriber_mock') + + def make_actor(self, node_num=1, arv_node=None, start_time=None): + if not hasattr(self, 'cloud_mock'): + self.make_mocks(node_num) + if start_time is None: + start_time = time.time() + start_time = time.time() + self.node_actor = cnode.ComputeNodeMonitorActor.start( + self.cloud_mock, start_time, self.shutdowns, self.timer, + self.updates, arv_node).proxy() + self.node_actor.subscribe(self.subscriber) + + def test_init_shutdown_scheduling(self): + self.make_actor() + self.wait_for_call(self.timer.schedule) + self.assertEqual(300, self.timer.schedule.call_args[0][0]) + + def test_shutdown_subscription(self): + self.make_actor() + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown() + self.wait_for_call(self.subscriber) + self.assertEqual(self.node_actor.actor_ref.actor_urn, + self.subscriber.call_args[0][0].actor_ref.actor_urn) + + def test_shutdown_without_arvados_node(self): + self.make_actor() + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown() + self.wait_for_call(self.subscriber) + + def test_no_shutdown_without_arvados_node_and_old_cloud_node(self): + self.make_actor(start_time=0) + self.shutdowns._set_state(True, 600) + self.node_actor.consider_shutdown() + self.assertFalse(self.subscriber.called) + + def check_shutdown_rescheduled(self, window_open, next_window, + schedule_time=None): + self.shutdowns._set_state(window_open, next_window) + self.timer.schedule.reset_mock() + self.node_actor.consider_shutdown() + self.wait_for_call(self.timer.schedule) + if schedule_time is not None: + self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0]) + self.assertFalse(self.subscriber.called) + + def test_shutdown_window_close_scheduling(self): + self.make_actor() + self.check_shutdown_rescheduled(False, 600, 600) + + def test_no_shutdown_when_node_running_job(self): + self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True)) + self.check_shutdown_rescheduled(True, 600) + + def test_no_shutdown_when_node_state_unknown(self): + self.make_actor(5, testutil.arvados_node_mock(5, info={})) + self.check_shutdown_rescheduled(True, 600) + + def test_no_shutdown_when_node_state_stale(self): + self.make_actor(6, testutil.arvados_node_mock(6, age=900)) + self.check_shutdown_rescheduled(True, 600) + + def test_arvados_node_match(self): + self.make_actor(2) + arv_node = testutil.arvados_node_mock( + 2, hostname='compute-two.zzzzz.arvadosapi.com') + pair_future = self.node_actor.offer_arvados_pair(arv_node) + self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT)) + self.wait_for_call(self.updates.sync_node) + self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node) + + def test_arvados_node_mismatch(self): + self.make_actor(3) + arv_node = testutil.arvados_node_mock(1) + pair_future = self.node_actor.offer_arvados_pair(arv_node) + self.assertIsNone(pair_future.get(self.TIMEOUT)) + + def test_update_cloud_node(self): + self.make_actor(1) + self.make_mocks(2) + self.cloud_mock.id = '1' + self.node_actor.update_cloud_node(self.cloud_mock) + current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) + self.assertEqual([testutil.ip_address_mock(2)], + current_cloud.private_ips) + + def test_missing_cloud_node_update(self): + self.make_actor(1) + self.node_actor.update_cloud_node(None) + current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT) + self.assertEqual([testutil.ip_address_mock(1)], + current_cloud.private_ips) + + def test_update_arvados_node(self): + self.make_actor(3) + job_uuid = 'zzzzz-jjjjj-updatejobnode00' + new_arvados = testutil.arvados_node_mock(3, job_uuid) + self.node_actor.update_arvados_node(new_arvados) + current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) + self.assertEqual(job_uuid, current_arvados['job_uuid']) + + def test_missing_arvados_node_update(self): + self.make_actor(4, testutil.arvados_node_mock(4)) + self.node_actor.update_arvados_node(None) + current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT) + self.assertEqual(testutil.ip_address_mock(4), + current_arvados['ip_address']) diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_ec2.py new file mode 100644 index 0000000000..d1c9e43fe4 --- /dev/null +++ b/services/nodemanager/tests/test_computenode_ec2.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time +import unittest + +import mock + +import arvnodeman.computenode.ec2 as ec2 +from . import testutil + +class EC2ComputeNodeDriverTestCase(unittest.TestCase): + def setUp(self): + self.driver_mock = mock.MagicMock(name='driver_mock') + + def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}): + create_kwargs.setdefault('ping_host', '100::') + return ec2.ComputeNodeDriver( + auth_kwargs, list_kwargs, create_kwargs, + driver_class=self.driver_mock) + + def test_driver_instantiation(self): + kwargs = {'key': 'testkey'} + driver = self.new_driver(auth_kwargs=kwargs) + self.assertTrue(self.driver_mock.called) + self.assertEqual(kwargs, self.driver_mock.call_args[1]) + + def test_list_kwargs_become_filters(self): + # We're also testing tag name translation. + driver = self.new_driver(list_kwargs={'tag_test': 'true'}) + driver.list_nodes() + list_method = self.driver_mock().list_nodes + self.assertTrue(list_method.called) + self.assertEqual({'tag:test': 'true'}, + list_method.call_args[1].get('ex_filters')) + + def test_create_location_loaded_at_initialization(self): + kwargs = {'location': 'testregion'} + driver = self.new_driver(create_kwargs=kwargs) + self.assertTrue(self.driver_mock().list_locations) + + def test_create_image_loaded_at_initialization(self): + kwargs = {'image': 'testimage'} + driver = self.new_driver(create_kwargs=kwargs) + self.assertTrue(self.driver_mock().list_images) + + def test_create_includes_ping_secret(self): + arv_node = testutil.arvados_node_mock(info={'ping_secret': 'ssshh'}) + driver = self.new_driver() + driver.create_node(testutil.MockSize(1), arv_node) + create_method = self.driver_mock().create_node + self.assertTrue(create_method.called) + self.assertIn('ping_secret=ssshh', + create_method.call_args[1].get('ex_userdata', + 'arg missing')) + + def test_tags_created_from_arvados_node(self): + arv_node = testutil.arvados_node_mock(8) + cloud_node = testutil.cloud_node_mock(8) + driver = self.new_driver(list_kwargs={'tag:list': 'test'}) + self.assertEqual({'ex_metadata': {'list': 'test'}, + 'name': 'compute8.zzzzz.arvadosapi.com'}, + driver.arvados_create_kwargs(arv_node)) + + def test_tags_set_default_hostname_from_new_arvados_node(self): + arv_node = testutil.arvados_node_mock(hostname=None) + driver = self.new_driver() + actual = driver.arvados_create_kwargs(arv_node) + self.assertEqual('dynamic.compute.zzzzz.arvadosapi.com', + actual['name']) + + def test_sync_node(self): + arv_node = testutil.arvados_node_mock(1) + cloud_node = testutil.cloud_node_mock(2) + driver = self.new_driver() + driver.sync_node(cloud_node, arv_node) + tag_mock = self.driver_mock().ex_create_tags + self.assertTrue(tag_mock.called) + self.assertEqual('compute1.zzzzz.arvadosapi.com', + tag_mock.call_args[0][1].get('Name', 'no name')) + + def test_node_create_time(self): + refsecs = int(time.time()) + reftuple = time.gmtime(refsecs) + node = testutil.cloud_node_mock() + node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z', + reftuple)} + self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node)) diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py new file mode 100644 index 0000000000..3aa95410c4 --- /dev/null +++ b/services/nodemanager/tests/test_config.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import io +import logging +import unittest + +import arvnodeman.config as nmconfig + +class NodeManagerConfigTestCase(unittest.TestCase): + TEST_CONFIG = u""" +[Cloud] +provider = dummy +shutdown_windows = 52, 6, 2 + +[Cloud Credentials] +creds = dummy_creds + +[Cloud List] +[Cloud Create] + +[Size 1] +cores = 1 + +[Logging] +file = /dev/null +level = DEBUG +testlogger = INFO +""" + + def load_config(self, config=None, config_str=None): + if config is None: + config = nmconfig.NodeManagerConfig() + if config_str is None: + config_str = self.TEST_CONFIG + with io.StringIO(config_str) as config_fp: + config.readfp(config_fp) + return config + + def test_seeded_defaults(self): + config = nmconfig.NodeManagerConfig() + sec_names = set(config.sections()) + self.assertIn('Arvados', sec_names) + self.assertIn('Daemon', sec_names) + self.assertFalse(any(name.startswith('Size ') for name in sec_names)) + + def test_list_sizes(self): + config = self.load_config() + client = config.new_cloud_client() + sizes = config.node_sizes(client.list_sizes()) + self.assertEqual(1, len(sizes)) + size, kwargs = sizes[0] + self.assertEqual('Small', size.name) + self.assertEqual(1, kwargs['cores']) + + def test_shutdown_windows(self): + config = self.load_config() + self.assertEqual([52, 6, 2], config.shutdown_windows()) + + def test_log_levels(self): + config = self.load_config() + self.assertEqual({'level': logging.DEBUG, + 'testlogger': logging.INFO}, + config.log_levels()) diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py new file mode 100644 index 0000000000..176b096714 --- /dev/null +++ b/services/nodemanager/tests/test_daemon.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time +import unittest + +import mock + +import arvnodeman.daemon as nmdaemon +from . import testutil + +class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, + unittest.TestCase): + def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[]): + for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']: + setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock')) + self.arv_factory = mock.MagicMock(name='arvados_mock') + self.cloud_factory = mock.MagicMock(name='cloud_mock') + self.cloud_factory().node_start_time.return_value = time.time() + self.cloud_updates = mock.MagicMock(name='updates_mock') + self.timer = testutil.MockTimer() + self.node_factory = mock.MagicMock(name='factory_mock') + self.node_setup = mock.MagicMock(name='setup_mock') + self.node_shutdown = mock.MagicMock(name='shutdown_mock') + self.daemon = nmdaemon.NodeManagerDaemonActor.start( + self.server_wishlist_poller, self.arvados_nodes_poller, + self.cloud_nodes_poller, self.cloud_updates, self.timer, + self.arv_factory, self.cloud_factory, + [54, 5, 1], 8, 600, 3600, + self.node_setup, self.node_shutdown, self.node_factory).proxy() + if cloud_nodes is not None: + self.daemon.update_cloud_nodes(cloud_nodes) + if arvados_nodes is not None: + self.daemon.update_arvados_nodes(arvados_nodes) + if want_sizes is not None: + self.daemon.update_server_wishlist(want_sizes) + + def test_easy_node_creation(self): + size = testutil.MockSize(1) + self.make_daemon(want_sizes=[size]) + self.wait_for_call(self.node_setup.start) + + def test_node_pairing(self): + cloud_node = testutil.cloud_node_mock(1) + arv_node = testutil.arvados_node_mock(1) + self.make_daemon([cloud_node], [arv_node]) + self.wait_for_call(self.node_factory.start) + pair_func = self.node_factory.start().proxy().offer_arvados_pair + self.wait_for_call(pair_func) + pair_func.assert_called_with(arv_node) + + def test_node_pairing_after_arvados_update(self): + cloud_node = testutil.cloud_node_mock(2) + arv_node = testutil.arvados_node_mock(2, ip_address=None) + self.make_daemon([cloud_node], None) + pair_func = self.node_factory.start().proxy().offer_arvados_pair + pair_func().get.return_value = None + self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT) + pair_func.assert_called_with(arv_node) + + pair_func().get.return_value = cloud_node.id + pair_func.reset_mock() + arv_node = testutil.arvados_node_mock(2) + self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT) + pair_func.assert_called_with(arv_node) + + def test_old_arvados_node_not_double_assigned(self): + arv_node = testutil.arvados_node_mock(3, age=9000) + size = testutil.MockSize(3) + self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size]) + node_starter = self.node_setup.start + deadline = time.time() + self.TIMEOUT + while (time.time() < deadline) and (node_starter.call_count < 2): + time.sleep(.1) + self.assertEqual(2, node_starter.call_count) + used_nodes = [call[1].get('arvados_node') + for call in node_starter.call_args_list] + self.assertIn(arv_node, used_nodes) + self.assertIn(None, used_nodes) + + def test_node_count_satisfied(self): + self.make_daemon([testutil.cloud_node_mock()]) + self.daemon.update_server_wishlist( + [testutil.MockSize(1)]).get(self.TIMEOUT) + self.assertFalse(self.node_setup.called) + + def test_booting_nodes_counted(self): + cloud_node = testutil.cloud_node_mock(1) + arv_node = testutil.arvados_node_mock(1) + server_wishlist = [testutil.MockSize(1)] * 2 + self.make_daemon([cloud_node], [arv_node], server_wishlist) + self.wait_for_call(self.node_setup.start) + self.node_setup.reset_mock() + self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT) + self.assertFalse(self.node_setup.called) + + def test_no_duplication_when_booting_node_listed_fast(self): + # Test that we don't start two ComputeNodeMonitorActors when + # we learn about a booting node through a listing before we + # get the "node up" message from CloudNodeSetupActor. + cloud_node = testutil.cloud_node_mock(1) + self.make_daemon(want_sizes=[testutil.MockSize(1)]) + self.wait_for_call(self.node_setup.start) + setup = mock.MagicMock(name='setup_node_mock') + setup.actor_ref = self.node_setup.start().proxy().actor_ref + setup.cloud_node.get.return_value = cloud_node + setup.arvados_node.get.return_value = testutil.arvados_node_mock(1) + self.daemon.update_cloud_nodes([cloud_node]) + self.wait_for_call(self.node_factory.start) + self.node_factory.reset_mock() + self.daemon.node_up(setup).get(self.TIMEOUT) + self.assertFalse(self.node_factory.start.called) + + def test_booting_nodes_shut_down(self): + self.make_daemon(want_sizes=[testutil.MockSize(1)]) + self.wait_for_call(self.node_setup.start) + self.daemon.update_server_wishlist([]) + self.wait_for_call( + self.node_setup.start().proxy().stop_if_no_cloud_node) + + def test_shutdown_declined_at_wishlist_capacity(self): + cloud_node = testutil.cloud_node_mock(1) + size = testutil.MockSize(1) + self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size]) + node_actor = self.node_factory().proxy() + self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT) + self.assertFalse(node_actor.shutdown.called) + + def test_shutdown_accepted_below_capacity(self): + self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()]) + node_actor = self.node_factory().proxy() + self.daemon.node_can_shutdown(node_actor) + self.wait_for_call(self.node_shutdown.start) + + def test_clean_shutdown_waits_for_node_setup_finish(self): + self.make_daemon(want_sizes=[testutil.MockSize(1)]) + self.wait_for_call(self.node_setup.start) + new_node = self.node_setup.start().proxy() + self.daemon.shutdown() + self.wait_for_call(new_node.stop_if_no_cloud_node) + self.daemon.node_up(new_node) + self.wait_for_call(new_node.stop) + self.assertTrue( + self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT)) + + def test_wishlist_ignored_after_shutdown(self): + size = testutil.MockSize(2) + self.make_daemon(want_sizes=[size]) + node_starter = self.node_setup.start + self.wait_for_call(node_starter) + node_starter.reset_mock() + self.daemon.shutdown() + self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT) + # Send another message and wait for a response, to make sure all + # internal messages generated by the wishlist update are processed. + self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT) + self.assertFalse(node_starter.called) diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py new file mode 100644 index 0000000000..3814ba4610 --- /dev/null +++ b/services/nodemanager/tests/test_jobqueue.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import unittest + +import arvnodeman.jobqueue as jobqueue +from . import testutil + +class ServerCalculatorTestCase(unittest.TestCase): + def make_calculator(self, factors, **kwargs): + return jobqueue.ServerCalculator( + [(testutil.MockSize(n), {'cores': n}) for n in factors], **kwargs) + + def calculate(self, servcalc, *constraints): + return servcalc.servers_for_queue( + [{'runtime_constraints': cdict} for cdict in constraints]) + + def test_empty_queue_needs_no_servers(self): + servcalc = self.make_calculator([1]) + self.assertEqual([], servcalc.servers_for_queue([])) + + def test_easy_server_count(self): + servcalc = self.make_calculator([1]) + servlist = self.calculate(servcalc, {'min_nodes': 3}) + self.assertEqual(3, len(servlist)) + + def test_implicit_server_count(self): + servcalc = self.make_calculator([1]) + servlist = self.calculate(servcalc, {}, {'min_nodes': 3}) + self.assertEqual(4, len(servlist)) + + def test_bad_min_nodes_override(self): + servcalc = self.make_calculator([1]) + servlist = self.calculate(servcalc, + {'min_nodes': -2}, {'min_nodes': 'foo'}) + self.assertEqual(2, len(servlist)) + + def test_ignore_unsatisfiable_jobs(self): + servcalc = self.make_calculator([1], max_nodes=9) + servlist = self.calculate(servcalc, + {'min_cores_per_node': 2}, + {'min_ram_mb_per_node': 256}, + {'min_nodes': 6}, + {'min_nodes': 12}, + {'min_scratch_mb_per_node': 200}) + self.assertEqual(6, len(servlist)) + + +class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin, + unittest.TestCase): + TEST_CLASS = jobqueue.JobQueueMonitorActor + + class MockCalculator(object): + @staticmethod + def servers_for_queue(queue): + return [testutil.MockSize(n) for n in queue] + + + def build_monitor(self, side_effect, *args, **kwargs): + super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs) + self.client.jobs().queue().execute.side_effect = side_effect + + def test_subscribers_get_server_lists(self): + self.build_monitor([{'items': [1, 2]}], self.MockCalculator()) + self.monitor.subscribe(self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with([testutil.MockSize(1), + testutil.MockSize(2)]) + + +if __name__ == '__main__': + unittest.main() + diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py new file mode 100644 index 0000000000..d9f47e2605 --- /dev/null +++ b/services/nodemanager/tests/test_nodelist.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import unittest + +import arvnodeman.nodelist as nodelist +from . import testutil + +class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin, + unittest.TestCase): + TEST_CLASS = nodelist.ArvadosNodeListMonitorActor + + def build_monitor(self, side_effect, *args, **kwargs): + super(ArvadosNodeListMonitorActorTestCase, self).build_monitor( + *args, **kwargs) + self.client.nodes().list().execute.side_effect = side_effect + + def test_uuid_is_subscription_key(self): + node = testutil.arvados_node_mock() + self.build_monitor([{'items': [node]}]) + self.monitor.subscribe_to(node['uuid'], self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with(node) + + +class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin, + unittest.TestCase): + TEST_CLASS = nodelist.CloudNodeListMonitorActor + + class MockNode(object): + def __init__(self, count): + self.id = str(count) + self.name = 'test{}.example.com'.format(count) + self.private_ips = ['10.0.0.{}'.format(count)] + self.public_ips = [] + self.size = None + self.state = 0 + + + def build_monitor(self, side_effect, *args, **kwargs): + super(CloudNodeListMonitorActorTestCase, self).build_monitor( + *args, **kwargs) + self.client.list_nodes.side_effect = side_effect + + def test_id_is_subscription_key(self): + node = self.MockNode(1) + self.build_monitor([[node]]) + self.monitor.subscribe_to('1', self.subscriber) + self.wait_for_call(self.subscriber) + self.subscriber.assert_called_with(node) + + +if __name__ == '__main__': + unittest.main() + diff --git a/services/nodemanager/tests/test_timedcallback.py b/services/nodemanager/tests/test_timedcallback.py new file mode 100644 index 0000000000..60f7b81bad --- /dev/null +++ b/services/nodemanager/tests/test_timedcallback.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time +import unittest + +import mock +import pykka + +import arvnodeman.timedcallback as timedcallback +from . import testutil + +@testutil.no_sleep +class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase): + def test_immediate_turnaround(self): + future = self.FUTURE_CLASS() + deliverer = timedcallback.TimedCallBackActor.start().proxy() + deliverer.schedule(time.time() - 1, future.set, 'immediate') + self.assertEqual('immediate', future.get(self.TIMEOUT)) + + def test_delayed_turnaround(self): + future = self.FUTURE_CLASS() + with mock.patch('time.time', return_value=0) as mock_now: + deliverer = timedcallback.TimedCallBackActor.start().proxy() + deliverer.schedule(1, future.set, 'delayed') + self.assertRaises(pykka.Timeout, future.get, .5) + mock_now.return_value = 2 + self.assertEqual('delayed', future.get(self.TIMEOUT)) + + def test_out_of_order_scheduling(self): + future1 = self.FUTURE_CLASS() + future2 = self.FUTURE_CLASS() + with mock.patch('time.time', return_value=1.5) as mock_now: + deliverer = timedcallback.TimedCallBackActor.start().proxy() + deliverer.schedule(2, future2.set, 'second') + deliverer.schedule(1, future1.set, 'first') + self.assertEqual('first', future1.get(self.TIMEOUT)) + self.assertRaises(pykka.Timeout, future2.get, .1) + mock_now.return_value = 3 + self.assertEqual('second', future2.get(self.TIMEOUT)) + + def test_dead_actors_ignored(self): + receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef) + receiver.tell.side_effect = pykka.ActorDeadError + deliverer = timedcallback.TimedCallBackActor.start().proxy() + deliverer.schedule(time.time() - 1, receiver.tell, 'error') + self.wait_for_call(receiver.tell) + receiver.tell.assert_called_with('error') + self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died") + + +if __name__ == '__main__': + unittest.main() + diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py new file mode 100644 index 0000000000..a33f76fe86 --- /dev/null +++ b/services/nodemanager/tests/testutil.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import time + +import mock +import pykka + +no_sleep = mock.patch('time.sleep', lambda n: None) + +def arvados_node_mock(node_num=99, job_uuid=None, age=0, **kwargs): + if job_uuid is True: + job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob' + slurm_state = 'idle' if (job_uuid is None) else 'alloc' + node = {'uuid': 'zzzzz-yyyyy-12345abcde67890', + 'created_at': '2014-01-01T01:02:03Z', + 'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', + time.gmtime(time.time() - age)), + 'hostname': 'compute{}'.format(node_num), + 'domain': 'zzzzz.arvadosapi.com', + 'ip_address': ip_address_mock(node_num), + 'job_uuid': job_uuid, + 'info': {'slurm_state': slurm_state}} + node.update(kwargs) + return node + +def cloud_node_mock(node_num=99): + node = mock.NonCallableMagicMock( + ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size', + 'image', 'extra'], + name='cloud_node') + node.id = str(node_num) + node.name = node.id + node.public_ips = [] + node.private_ips = [ip_address_mock(node_num)] + return node + +def ip_address_mock(last_octet): + return '10.20.30.{}'.format(last_octet) + +class MockSize(object): + def __init__(self, factor): + self.id = 'z{}.test'.format(factor) + self.name = self.id + self.ram = 128 * factor + self.disk = 100 * factor + self.bandwidth = 16 * factor + self.price = float(factor) + self.extra = {} + + def __eq__(self, other): + return self.id == other.id + + +class MockTimer(object): + def schedule(self, want_time, callback, *args, **kwargs): + return callback(*args, **kwargs) + + +class ActorTestMixin(object): + FUTURE_CLASS = pykka.ThreadingFuture + TIMEOUT = 5 + + def tearDown(self): + pykka.ActorRegistry.stop_all() + + def wait_for_call(self, mock_func, timeout=TIMEOUT): + deadline = time.time() + timeout + while (not mock_func.called) and (time.time() < deadline): + time.sleep(.1) + self.assertTrue(mock_func.called, "{} not called".format(mock_func)) + + +class RemotePollLoopActorTestMixin(ActorTestMixin): + def build_monitor(self, *args, **kwargs): + self.timer = mock.MagicMock(name='timer_mock') + self.client = mock.MagicMock(name='client_mock') + self.subscriber = mock.Mock(name='subscriber_mock') + self.monitor = self.TEST_CLASS.start( + self.client, self.timer, *args, **kwargs).proxy()