4380: Reorganize arvnodeman.computenode.
authorBrett Smith <brett@curoverse.com>
Tue, 11 Nov 2014 22:23:14 +0000 (17:23 -0500)
committerBrett Smith <brett@curoverse.com>
Wed, 12 Nov 2014 18:59:01 +0000 (13:59 -0500)
This makes the hierarchy a little richer:

* arvnodeman.computenode.driver has all the cloud driver wrappers.
* arvnodeman.computenode.dispatch will be just like that, except it
  will consider local dispatch concerns.  For example, I'm going to
  add a SLURM submodule here to take care of draining.
* arvnodeman.computenode still has utility functions and
  ShutdownTimer.

12 files changed:
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/driver/__init__.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/driver/dummy.py [moved from services/nodemanager/arvnodeman/computenode/dummy.py with 96% similarity]
services/nodemanager/arvnodeman/computenode/driver/ec2.py [moved from services/nodemanager/arvnodeman/computenode/ec2.py with 98% similarity]
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/tests/test_computenode.py
services/nodemanager/tests/test_computenode_dispatch.py [new file with mode: 0644]
services/nodemanager/tests/test_computenode_driver_ec2.py [moved from services/nodemanager/tests/test_computenode_ec2.py with 98% similarity]
services/nodemanager/tests/test_daemon.py

index 63effe9d37bd1150380c5723f02ca15db7028c0b..4955992faa4d7cc2da43e2d39b940615a1c63710 100644 (file)
@@ -2,16 +2,9 @@
 
 from __future__ import absolute_import, print_function
 
-import functools
 import itertools
-import logging
 import time
 
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
-
 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
     hostname = arvados_node.get('hostname') or default_hostname
     return '{}.{}'.format(hostname, arvados_node['domain'])
@@ -23,252 +16,6 @@ def arvados_node_mtime(node):
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
-
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
-    """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
-
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
-    """Base class for actors that change a compute node's state.
-
-    This base class takes care of retrying changes and notifying
-    subscribers when the change is finished.
-    """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
-        super(ComputeNodeStateChangeBase, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
-        self._logger = logging.getLogger(logger_name)
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
-        self.subscribers = set()
-
-    @staticmethod
-    def _retry(errors):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises any of the given errors.
-        """
-        def decorator(orig_func):
-            @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
-                try:
-                    orig_func(self, *args, **kwargs)
-                except errors as error:
-                    self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
-                    self.retry_wait = min(self.retry_wait * 2,
-                                          self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return wrapper
-        return decorator
-
-    def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
-
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
-    """Actor to create and set up a cloud compute node.
-
-    This actor prepares an Arvados node record for a new compute node
-    (either creating one or cleaning one passed in), then boots the
-    actual compute node.  It notifies subscribers when the cloud node
-    is successfully created (the last step in the process for Node
-    Manager to handle).
-    """
-    def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node=None,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
-        self._arvados = arvados_client
-        self._cloud = cloud_client
-        self.cloud_size = cloud_size
-        self.arvados_node = None
-        self.cloud_node = None
-        if arvados_node is None:
-            self._later.create_arvados_node()
-        else:
-            self._later.prepare_arvados_node(arvados_node)
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
-                          self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        self._finished()
-
-    def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
-    """Actor to shut down a compute node.
-
-    This actor simply destroys a cloud node, retrying as needed.
-    """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
-        self._later.shutdown_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
-    """Actor to dispatch one-off cloud management requests.
-
-    This actor receives requests for small cloud updates, and
-    dispatches them to a real driver.  ComputeNodeMonitorActors use
-    this to perform maintenance tasks on themselves.  Having a
-    dedicated actor for this gives us the opportunity to control the
-    flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
-    """
-    def __init__(self, cloud_factory, max_retry_wait=180):
-        super(ComputeNodeUpdateActor, self).__init__()
-        self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
-
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return wrapper
-
-    @_throttle_errors
-    def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
-
-
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
@@ -309,103 +56,3 @@ class ShutdownTimer(object):
     def window_open(self):
         self._advance_opening()
         return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
-    """Actor to manage a running compute node.
-
-    This actor gets updates about a compute node's cloud and Arvados records.
-    It uses this information to notify subscribers when the node is eligible
-    for shutdown.
-    """
-    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
-        self._shutdowns = shutdown_timer
-        self._timer = timer_actor
-        self._update = update_actor
-        self.cloud_node = cloud_node
-        self.cloud_node_start_time = cloud_node_start_time
-        self.poll_stale_after = poll_stale_after
-        self.node_stale_after = node_stale_after
-        self.subscribers = set()
-        self.arvados_node = None
-        self._later.update_arvados_node(arvados_node)
-        self.last_shutdown_opening = None
-        self._later.consider_shutdown()
-
-    def subscribe(self, subscriber):
-        self.subscribers.add(subscriber)
-
-    def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
-        self._logger.debug(msg, *args)
-
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
-        state = self.arvados_node['info'].get('slurm_state')
-        if not state:
-            return None
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
-
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
-            # If this is a new, unpaired node, it's eligible for
-            # shutdown--we figure there was an error during bootstrap.
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
-        else:
-            return self.in_state('idle')
-
-    def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
-
-    def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
-            return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
-            self._later.update_arvados_node(arvados_node)
-            return self.cloud_node.id
-        else:
-            return None
-
-    def update_cloud_node(self, cloud_node):
-        if cloud_node is not None:
-            self.cloud_node = cloud_node
-            self._later.consider_shutdown()
-
-    def update_arvados_node(self, arvados_node):
-        if arvados_node is not None:
-            self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
-                self._update.sync_node(self.cloud_node, self.arvados_node)
-            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
new file mode 100644 (file)
index 0000000..d613ef1
--- /dev/null
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import logging
+import time
+
+import pykka
+
+from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from ...clientactor import _notify_subscribers
+from ... import config
+
+class ComputeNodeStateChangeBase(config.actor_class):
+    """Base class for actors that change a compute node's state.
+
+    This base class takes care of retrying changes and notifying
+    subscribers when the change is finished.
+    """
+    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+        super(ComputeNodeStateChangeBase, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._timer = timer_actor
+        self._logger = logging.getLogger(logger_name)
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.subscribers = set()
+
+    @staticmethod
+    def _retry(errors):
+        """Retry decorator for an actor method that makes remote requests.
+
+        Use this function to decorator an actor method, and pass in a
+        tuple of exceptions to catch.  This decorator will schedule
+        retries of that method with exponential backoff if the
+        original method raises any of the given errors.
+        """
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def wrapper(self, *args, **kwargs):
+                try:
+                    orig_func(self, *args, **kwargs)
+                except errors as error:
+                    self._logger.warning(
+                        "Client error: %s - waiting %s seconds",
+                        error, self.retry_wait)
+                    self._timer.schedule(self.retry_wait,
+                                         getattr(self._later,
+                                                 orig_func.__name__),
+                                         *args, **kwargs)
+                    self.retry_wait = min(self.retry_wait * 2,
+                                          self.max_retry_wait)
+                else:
+                    self.retry_wait = self.min_retry_wait
+            return wrapper
+        return decorator
+
+    def _finished(self):
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
+    """Actor to create and set up a cloud compute node.
+
+    This actor prepares an Arvados node record for a new compute node
+    (either creating one or cleaning one passed in), then boots the
+    actual compute node.  It notifies subscribers when the cloud node
+    is successfully created (the last step in the process for Node
+    Manager to handle).
+    """
+    def __init__(self, timer_actor, arvados_client, cloud_client,
+                 cloud_size, arvados_node=None,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeSetupActor, self).__init__(
+            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+        self._arvados = arvados_client
+        self._cloud = cloud_client
+        self.cloud_size = cloud_size
+        self.arvados_node = None
+        self.cloud_node = None
+        if arvados_node is None:
+            self._later.create_arvados_node()
+        else:
+            self._later.prepare_arvados_node(arvados_node)
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def create_arvados_node(self):
+        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self._later.create_cloud_node()
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def prepare_arvados_node(self, node):
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': "Prepared by Node Manager"}}
+            ).execute()
+        self._later.create_cloud_node()
+
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    def create_cloud_node(self):
+        self._logger.info("Creating cloud node with size %s.",
+                          self.cloud_size.name)
+        self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                  self.arvados_node)
+        self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._finished()
+
+    def stop_if_no_cloud_node(self):
+        if self.cloud_node is None:
+            self.stop()
+
+
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
+    """Actor to shut down a compute node.
+
+    This actor simply destroys a cloud node, retrying as needed.
+    """
+    def __init__(self, timer_actor, cloud_client, cloud_node,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeShutdownActor, self).__init__(
+            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
+        self._cloud = cloud_client
+        self.cloud_node = cloud_node
+        self._later.shutdown_node()
+
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    def shutdown_node(self):
+        self._cloud.destroy_node(self.cloud_node)
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._finished()
+
+
+class ComputeNodeUpdateActor(config.actor_class):
+    """Actor to dispatch one-off cloud management requests.
+
+    This actor receives requests for small cloud updates, and
+    dispatches them to a real driver.  ComputeNodeMonitorActors use
+    this to perform maintenance tasks on themselves.  Having a
+    dedicated actor for this gives us the opportunity to control the
+    flow of requests; e.g., by backing off when errors occur.
+
+    This actor is most like a "traditional" Pykka actor: there's no
+    subscribing, but instead methods return real driver results.  If
+    you're interested in those results, you should get them from the
+    Future that the proxy method returns.  Be prepared to handle exceptions
+    from the cloud driver when you do.
+    """
+    def __init__(self, cloud_factory, max_retry_wait=180):
+        super(ComputeNodeUpdateActor, self).__init__()
+        self._cloud = cloud_factory()
+        self.max_retry_wait = max_retry_wait
+        self.error_streak = 0
+        self.next_request_time = time.time()
+
+    def _throttle_errors(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            throttle_time = self.next_request_time - time.time()
+            if throttle_time > 0:
+                time.sleep(throttle_time)
+            self.next_request_time = time.time()
+            try:
+                result = orig_func(self, *args, **kwargs)
+            except config.CLOUD_ERRORS:
+                self.error_streak += 1
+                self.next_request_time += min(2 ** self.error_streak,
+                                              self.max_retry_wait)
+                raise
+            else:
+                self.error_streak = 0
+                return result
+        return wrapper
+
+    @_throttle_errors
+    def sync_node(self, cloud_node, arvados_node):
+        return self._cloud.sync_node(cloud_node, arvados_node)
+
+
+class ComputeNodeMonitorActor(config.actor_class):
+    """Actor to manage a running compute node.
+
+    This actor gets updates about a compute node's cloud and Arvados records.
+    It uses this information to notify subscribers when the node is eligible
+    for shutdown.
+    """
+    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+                 timer_actor, update_actor, arvados_node=None,
+                 poll_stale_after=600, node_stale_after=3600):
+        super(ComputeNodeMonitorActor, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.computenode')
+        self._last_log = None
+        self._shutdowns = shutdown_timer
+        self._timer = timer_actor
+        self._update = update_actor
+        self.cloud_node = cloud_node
+        self.cloud_node_start_time = cloud_node_start_time
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.subscribers = set()
+        self.arvados_node = None
+        self._later.update_arvados_node(arvados_node)
+        self.last_shutdown_opening = None
+        self._later.consider_shutdown()
+
+    def subscribe(self, subscriber):
+        self.subscribers.add(subscriber)
+
+    def _debug(self, msg, *args):
+        if msg == self._last_log:
+            return
+        self._last_log = msg
+        self._logger.debug(msg, *args)
+
+    def in_state(self, *states):
+        # Return a boolean to say whether or not our Arvados node record is in
+        # one of the given states.  If state information is not
+        # available--because this node has no Arvados record, the record is
+        # stale, or the record has no state information--return None.
+        if (self.arvados_node is None) or not timestamp_fresh(
+              arvados_node_mtime(self.arvados_node), self.node_stale_after):
+            return None
+        state = self.arvados_node['info'].get('slurm_state')
+        if not state:
+            return None
+        result = state in states
+        if state == 'idle':
+            result = result and not self.arvados_node['job_uuid']
+        return result
+
+    def _shutdown_eligible(self):
+        if self.arvados_node is None:
+            # If this is a new, unpaired node, it's eligible for
+            # shutdown--we figure there was an error during bootstrap.
+            return timestamp_fresh(self.cloud_node_start_time,
+                                   self.node_stale_after)
+        else:
+            return self.in_state('idle')
+
+    def consider_shutdown(self):
+        next_opening = self._shutdowns.next_opening()
+        if self._shutdowns.window_open():
+            if self._shutdown_eligible():
+                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+                _notify_subscribers(self._later, self.subscribers)
+            else:
+                self._debug("Node %s shutdown window open but node busy.",
+                            self.cloud_node.id)
+        else:
+            self._debug("Node %s shutdown window closed.  Next at %s.",
+                        self.cloud_node.id, time.ctime(next_opening))
+        if self.last_shutdown_opening != next_opening:
+            self._timer.schedule(next_opening, self._later.consider_shutdown)
+            self.last_shutdown_opening = next_opening
+
+    def offer_arvados_pair(self, arvados_node):
+        if self.arvados_node is not None:
+            return None
+        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+            self._later.update_arvados_node(arvados_node)
+            return self.cloud_node.id
+        else:
+            return None
+
+    def update_cloud_node(self, cloud_node):
+        if cloud_node is not None:
+            self.cloud_node = cloud_node
+            self._later.consider_shutdown()
+
+    def update_arvados_node(self, arvados_node):
+        if arvados_node is not None:
+            self.arvados_node = arvados_node
+            new_hostname = arvados_node_fqdn(self.arvados_node)
+            if new_hostname != self.cloud_node.name:
+                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
new file mode 100644 (file)
index 0000000..a20cfde
--- /dev/null
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+class BaseComputeNodeDriver(object):
+    """Abstract base class for compute node drivers.
+
+    libcloud abstracts away many of the differences between cloud providers,
+    but managing compute nodes requires some cloud-specific features (e.g.,
+    on EC2 we use tags to identify compute nodes).  Compute node drivers
+    are responsible for translating the node manager's cloud requests to a
+    specific cloud's vocabulary.
+
+    Subclasses must implement arvados_create_kwargs (to update node
+    creation kwargs with information about the specific Arvados node
+    record), sync_node, and node_start_time.
+    """
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+        self.real = driver_class(**auth_kwargs)
+        self.list_kwargs = list_kwargs
+        self.create_kwargs = create_kwargs
+
+    def __getattr__(self, name):
+        # Proxy non-extension methods to the real driver.
+        if (not name.startswith('_') and not name.startswith('ex_')
+              and hasattr(self.real, name)):
+            return getattr(self.real, name)
+        else:
+            return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+    def search_for(self, term, list_method, key=lambda item: item.id):
+        cache_key = (list_method, term)
+        if cache_key not in self.SEARCH_CACHE:
+            results = [item for item in getattr(self.real, list_method)()
+                       if key(item) == term]
+            count = len(results)
+            if count != 1:
+                raise ValueError("{} returned {} results for '{}'".format(
+                        list_method, count, term))
+            self.SEARCH_CACHE[cache_key] = results[0]
+        return self.SEARCH_CACHE[cache_key]
+
+    def list_nodes(self):
+        return self.real.list_nodes(**self.list_kwargs)
+
+    def arvados_create_kwargs(self, arvados_node):
+        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+    def create_node(self, size, arvados_node):
+        kwargs = self.create_kwargs.copy()
+        kwargs.update(self.arvados_create_kwargs(arvados_node))
+        kwargs['size'] = size
+        return self.real.create_node(**kwargs)
+
+    def sync_node(self, cloud_node, arvados_node):
+        # When a compute node first pings the API server, the API server
+        # will automatically assign some attributes on the corresponding
+        # node record, like hostname.  This method should propagate that
+        # information back to the cloud node appropriately.
+        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+    @classmethod
+    def node_start_time(cls, node):
+        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
similarity index 96%
rename from services/nodemanager/arvnodeman/computenode/dummy.py
rename to services/nodemanager/arvnodeman/computenode/driver/dummy.py
index 6c39feaf614d9feaf84bc59a003f5ada657250a6..3a286bba017d3b9fd8999fe7185a90d197917df8 100644 (file)
@@ -7,7 +7,8 @@ import time
 import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 class ComputeNodeDriver(BaseComputeNodeDriver):
     """Compute node driver wrapper for libcloud's dummy driver.
similarity index 98%
rename from services/nodemanager/arvnodeman/computenode/ec2.py
rename to services/nodemanager/arvnodeman/computenode/driver/ec2.py
index 359bed4d1cb76fc557647e18395f9497134b66a3..c0992f7b9635e2c47edd8e67cf5b887ba5692718 100644 (file)
@@ -9,7 +9,8 @@ import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 from libcloud.compute.drivers import ec2 as cloud_ec2
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 ### Monkeypatch libcloud to support AWS' new SecurityGroup API.
 # These classes can be removed when libcloud support specifying
index 754b9319f0797d864ab0c9eb50107054485e6eed..24fd828cf5b2a20b9bc91ffd906d464a155eabc6 100644 (file)
@@ -85,7 +85,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                            http=http)
 
     def new_cloud_client(self):
-        module = importlib.import_module('arvnodeman.computenode.' +
+        module = importlib.import_module('arvnodeman.computenode.driver.' +
                                          self.get('Cloud', 'provider'))
         auth_kwargs = self.get_section('Cloud Credentials')
         if 'timeout' in auth_kwargs:
index eaf10be324de16811e655e2b8427bed85ad6709d..7ff736fb86dcd075e0c9c3777479a746e1767ea7 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from .computenode import dispatch
 from .config import actor_class
 
 class _ComputeNodeRecord(object):
@@ -96,9 +97,9 @@ class NodeManagerDaemonActor(actor_class):
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
                  poll_stale_after=600, node_stale_after=7200,
-                 node_setup_class=cnode.ComputeNodeSetupActor,
-                 node_shutdown_class=cnode.ComputeNodeShutdownActor,
-                 node_actor_class=cnode.ComputeNodeMonitorActor):
+                 node_setup_class=dispatch.ComputeNodeSetupActor,
+                 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
+                 node_actor_class=dispatch.ComputeNodeMonitorActor):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
index f4ad7163fa2ac0ef445d5feb8317f412e07401b4..d2f4afee061e26fd19085575f7b5bef33c977efd 100644 (file)
@@ -12,9 +12,7 @@ import daemon
 import pykka
 
 from . import config as nmconfig
-from .computenode import \
-    ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
-    ShutdownTimer
+from .computenode.dispatch import ComputeNodeUpdateActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
index 5ced5f99bbc6a768f9817b7ec364dfc550354541..e22cccc5b0193b1949297fdae1fdd9fa24a4836b 100644 (file)
@@ -6,137 +6,11 @@ import time
 import unittest
 
 import arvados.errors as arverror
-import httplib2
 import mock
-import pykka
 
 import arvnodeman.computenode as cnode
 from . import testutil
 
-class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
-    def make_mocks(self, arvados_effect=None, cloud_effect=None):
-        if arvados_effect is None:
-            arvados_effect = [testutil.arvados_node_mock()]
-        self.arvados_effect = arvados_effect
-        self.timer = testutil.MockTimer()
-        self.api_client = mock.MagicMock(name='api_client')
-        self.api_client.nodes().create().execute.side_effect = arvados_effect
-        self.api_client.nodes().update().execute.side_effect = arvados_effect
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks(arvados_effect=[arv_node])
-        self.setup_actor = cnode.ComputeNodeSetupActor.start(
-            self.timer, self.api_client, self.cloud_client,
-            testutil.MockSize(1), arv_node).proxy()
-
-    def test_creation_without_arvados_node(self):
-        self.make_actor()
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().create().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_creation_with_arvados_node(self):
-        self.make_actor(testutil.arvados_node_mock())
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().update().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_failed_calls_retried(self):
-        self.make_mocks([
-                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
-                testutil.arvados_node_mock(),
-                ])
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-
-    def test_stop_when_no_cloud_node(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        self.setup_actor.stop_if_no_cloud_node()
-        self.assertTrue(
-            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
-
-    def test_no_stop_when_cloud_node(self):
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
-        self.assertTrue(self.stop_proxy(self.setup_actor),
-                        "actor was stopped by stop_if_no_cloud_node")
-
-    def test_subscribe(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.setup_actor.subscribe(subscriber)
-        self.api_client.nodes().create().execute.side_effect = [
-            testutil.arvados_node_mock()]
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.setup_actor)
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
-                                       unittest.TestCase):
-    def make_mocks(self, cloud_node=None):
-        self.timer = testutil.MockTimer()
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        if cloud_node is None:
-            cloud_node = testutil.cloud_node_mock()
-        self.cloud_node = cloud_node
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks()
-        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
-            self.timer, self.cloud_client, self.cloud_node).proxy()
-
-    def test_easy_shutdown(self):
-        self.make_actor()
-        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertTrue(self.cloud_client.destroy_node.called)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
-                                     unittest.TestCase):
-    def make_actor(self):
-        self.driver = mock.MagicMock(name='driver_mock')
-        self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
-
-    def test_node_sync(self):
-        self.make_actor()
-        cloud_node = testutil.cloud_node_mock()
-        arv_node = testutil.arvados_node_mock()
-        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
-
-
 @mock.patch('time.time', return_value=1)
 class ShutdownTimerTestCase(unittest.TestCase):
     def test_two_length_window(self, time_mock):
@@ -160,156 +34,3 @@ class ShutdownTimerTestCase(unittest.TestCase):
         time_mock.return_value += 200
         self.assertEqual(961, timer.next_opening())
         self.assertFalse(timer.window_open())
-
-
-class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
-                                      unittest.TestCase):
-    class MockShutdownTimer(object):
-        def _set_state(self, is_open, next_opening):
-            self.window_open = lambda: is_open
-            self.next_opening = lambda: next_opening
-
-
-    def make_mocks(self, node_num):
-        self.shutdowns = self.MockShutdownTimer()
-        self.shutdowns._set_state(False, 300)
-        self.timer = mock.MagicMock(name='timer_mock')
-        self.updates = mock.MagicMock(name='update_mock')
-        self.cloud_mock = testutil.cloud_node_mock(node_num)
-        self.subscriber = mock.Mock(name='subscriber_mock')
-
-    def make_actor(self, node_num=1, arv_node=None, start_time=None):
-        if not hasattr(self, 'cloud_mock'):
-            self.make_mocks(node_num)
-        if start_time is None:
-            start_time = time.time()
-        self.node_actor = cnode.ComputeNodeMonitorActor.start(
-            self.cloud_mock, start_time, self.shutdowns, self.timer,
-            self.updates, arv_node).proxy()
-        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
-    def node_state(self, *states):
-        return self.node_actor.in_state(*states).get(self.TIMEOUT)
-
-    def test_in_state_when_unpaired(self):
-        self.make_actor()
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_pairing_stale(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(
-                job_uuid=None, age=90000))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_no_state_available(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_idle_state(self):
-        self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
-        self.assertTrue(self.node_state('idle'))
-        self.assertFalse(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_in_alloc_state(self):
-        self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
-        self.assertFalse(self.node_state('idle'))
-        self.assertTrue(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_init_shutdown_scheduling(self):
-        self.make_actor()
-        self.assertTrue(self.timer.schedule.called)
-        self.assertEqual(300, self.timer.schedule.call_args[0][0])
-
-    def test_shutdown_subscription(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-        self.assertEqual(self.node_actor.actor_ref.actor_urn,
-                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_shutdown_without_arvados_node(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-
-    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
-        self.make_actor(start_time=0)
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertFalse(self.subscriber.called)
-
-    def check_shutdown_rescheduled(self, window_open, next_window,
-                                   schedule_time=None):
-        self.shutdowns._set_state(window_open, next_window)
-        self.timer.schedule.reset_mock()
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.stop_proxy(self.node_actor)
-        self.assertTrue(self.timer.schedule.called)
-        if schedule_time is not None:
-            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
-        self.assertFalse(self.subscriber.called)
-
-    def test_shutdown_window_close_scheduling(self):
-        self.make_actor()
-        self.check_shutdown_rescheduled(False, 600, 600)
-
-    def test_no_shutdown_when_node_running_job(self):
-        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_unknown(self):
-        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_stale(self):
-        self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_arvados_node_match(self):
-        self.make_actor(2)
-        arv_node = testutil.arvados_node_mock(
-            2, hostname='compute-two.zzzzz.arvadosapi.com')
-        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
-        self.assertEqual(self.cloud_mock.id, pair_id)
-        self.stop_proxy(self.node_actor)
-        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
-
-    def test_arvados_node_mismatch(self):
-        self.make_actor(3)
-        arv_node = testutil.arvados_node_mock(1)
-        self.assertIsNone(
-            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
-
-    def test_update_cloud_node(self):
-        self.make_actor(1)
-        self.make_mocks(2)
-        self.cloud_mock.id = '1'
-        self.node_actor.update_cloud_node(self.cloud_mock)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(2)],
-                         current_cloud.private_ips)
-
-    def test_missing_cloud_node_update(self):
-        self.make_actor(1)
-        self.node_actor.update_cloud_node(None)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(1)],
-                         current_cloud.private_ips)
-
-    def test_update_arvados_node(self):
-        self.make_actor(3)
-        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
-        new_arvados = testutil.arvados_node_mock(3, job_uuid)
-        self.node_actor.update_arvados_node(new_arvados)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(job_uuid, current_arvados['job_uuid'])
-
-    def test_missing_arvados_node_update(self):
-        self.make_actor(4, testutil.arvados_node_mock(4))
-        self.node_actor.update_arvados_node(None)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(testutil.ip_address_mock(4),
-                         current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
new file mode 100644 (file)
index 0000000..ece186b
--- /dev/null
@@ -0,0 +1,290 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode.dispatch as dispatch
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def make_mocks(self, arvados_effect=None, cloud_effect=None):
+        if arvados_effect is None:
+            arvados_effect = [testutil.arvados_node_mock()]
+        self.arvados_effect = arvados_effect
+        self.timer = testutil.MockTimer()
+        self.api_client = mock.MagicMock(name='api_client')
+        self.api_client.nodes().create().execute.side_effect = arvados_effect
+        self.api_client.nodes().update().execute.side_effect = arvados_effect
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks(arvados_effect=[arv_node])
+        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+            self.timer, self.api_client, self.cloud_client,
+            testutil.MockSize(1), arv_node).proxy()
+
+    def test_creation_without_arvados_node(self):
+        self.make_actor()
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().create().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+    def test_creation_with_arvados_node(self):
+        self.make_actor(testutil.arvados_node_mock())
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().update().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+    def test_failed_calls_retried(self):
+        self.make_mocks([
+                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+                testutil.arvados_node_mock(),
+                ])
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+
+    def test_stop_when_no_cloud_node(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        self.setup_actor.stop_if_no_cloud_node()
+        self.assertTrue(
+            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_no_stop_when_cloud_node(self):
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(self.setup_actor),
+                        "actor was stopped by stop_if_no_cloud_node")
+
+    def test_subscribe(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.setup_actor.subscribe(subscriber)
+        self.api_client.nodes().create().execute.side_effect = [
+            testutil.arvados_node_mock()]
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.setup_actor)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+                                       unittest.TestCase):
+    def make_mocks(self, cloud_node=None):
+        self.timer = testutil.MockTimer()
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        if cloud_node is None:
+            cloud_node = testutil.cloud_node_mock()
+        self.cloud_node = cloud_node
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
+            self.timer, self.cloud_client, self.cloud_node).proxy()
+
+    def test_easy_shutdown(self):
+        self.make_actor()
+        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertTrue(self.cloud_client.destroy_node.called)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
+                                     unittest.TestCase):
+    def make_actor(self):
+        self.driver = mock.MagicMock(name='driver_mock')
+        self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
+
+    def test_node_sync(self):
+        self.make_actor()
+        cloud_node = testutil.cloud_node_mock()
+        arv_node = testutil.arvados_node_mock()
+        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
+        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
+
+
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
+    class MockShutdownTimer(object):
+        def _set_state(self, is_open, next_opening):
+            self.window_open = lambda: is_open
+            self.next_opening = lambda: next_opening
+
+
+    def make_mocks(self, node_num):
+        self.shutdowns = self.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updates = mock.MagicMock(name='update_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        self.node_actor = dispatch.ComputeNodeMonitorActor.start(
+            self.cloud_mock, start_time, self.shutdowns, self.timer,
+            self.updates, arv_node).proxy()
+        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+    def node_state(self, *states):
+        return self.node_actor.in_state(*states).get(self.TIMEOUT)
+
+    def test_in_state_when_unpaired(self):
+        self.make_actor()
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_state_when_pairing_stale(self):
+        self.make_actor(arv_node=testutil.arvados_node_mock(
+                job_uuid=None, age=90000))
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_state_when_no_state_available(self):
+        self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_idle_state(self):
+        self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
+        self.assertTrue(self.node_state('idle'))
+        self.assertFalse(self.node_state('alloc'))
+        self.assertTrue(self.node_state('idle', 'alloc'))
+
+    def test_in_alloc_state(self):
+        self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
+        self.assertFalse(self.node_state('idle'))
+        self.assertTrue(self.node_state('alloc'))
+        self.assertTrue(self.node_state('idle', 'alloc'))
+
+    def test_init_shutdown_scheduling(self):
+        self.make_actor()
+        self.assertTrue(self.timer.schedule.called)
+        self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+    def test_shutdown_subscription(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
+        self.assertEqual(self.node_actor.actor_ref.actor_urn,
+                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_shutdown_without_arvados_node(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
+
+    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+        self.make_actor(start_time=0)
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertFalse(self.subscriber.called)
+
+    def check_shutdown_rescheduled(self, window_open, next_window,
+                                   schedule_time=None):
+        self.shutdowns._set_state(window_open, next_window)
+        self.timer.schedule.reset_mock()
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.stop_proxy(self.node_actor)
+        self.assertTrue(self.timer.schedule.called)
+        if schedule_time is not None:
+            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+        self.assertFalse(self.subscriber.called)
+
+    def test_shutdown_window_close_scheduling(self):
+        self.make_actor()
+        self.check_shutdown_rescheduled(False, 600, 600)
+
+    def test_no_shutdown_when_node_running_job(self):
+        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_unknown(self):
+        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_stale(self):
+        self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_arvados_node_match(self):
+        self.make_actor(2)
+        arv_node = testutil.arvados_node_mock(
+            2, hostname='compute-two.zzzzz.arvadosapi.com')
+        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
+        self.assertEqual(self.cloud_mock.id, pair_id)
+        self.stop_proxy(self.node_actor)
+        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
+
+    def test_arvados_node_mismatch(self):
+        self.make_actor(3)
+        arv_node = testutil.arvados_node_mock(1)
+        self.assertIsNone(
+            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
+
+    def test_update_cloud_node(self):
+        self.make_actor(1)
+        self.make_mocks(2)
+        self.cloud_mock.id = '1'
+        self.node_actor.update_cloud_node(self.cloud_mock)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(2)],
+                         current_cloud.private_ips)
+
+    def test_missing_cloud_node_update(self):
+        self.make_actor(1)
+        self.node_actor.update_cloud_node(None)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(1)],
+                         current_cloud.private_ips)
+
+    def test_update_arvados_node(self):
+        self.make_actor(3)
+        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+        new_arvados = testutil.arvados_node_mock(3, job_uuid)
+        self.node_actor.update_arvados_node(new_arvados)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+    def test_missing_arvados_node_update(self):
+        self.make_actor(4, testutil.arvados_node_mock(4))
+        self.node_actor.update_arvados_node(None)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(testutil.ip_address_mock(4),
+                         current_arvados['ip_address'])
similarity index 98%
rename from services/nodemanager/tests/test_computenode_ec2.py
rename to services/nodemanager/tests/test_computenode_driver_ec2.py
index d1c9e43fe47b9968bd13d28c1244f8b7e2efb879..fde103e10e606f68ca5e0b3ba262f0a350e6df64 100644 (file)
@@ -7,7 +7,7 @@ import unittest
 
 import mock
 
-import arvnodeman.computenode.ec2 as ec2
+import arvnodeman.computenode.driver.ec2 as ec2
 from . import testutil
 
 class EC2ComputeNodeDriverTestCase(unittest.TestCase):
index 4bffd092a164c75824453f08e00f3ea755315fcb..394fb88b80cf6ff10efc04fe79839c2eb42954ea 100644 (file)
@@ -8,8 +8,8 @@ import unittest
 import mock
 import pykka
 
-import arvnodeman.computenode as nmcnode
 import arvnodeman.daemon as nmdaemon
+from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -39,7 +39,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor)
+        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
 
     def alive_monitor_count(self):
         return sum(1 for actor in self.monitor_list() if actor.is_alive())