Merge branch 'master' into 4232-slow-pipes-n-jobs
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
index ae25428944dd8a967de0f60b6462197acfff4fb2..f5186074c6bc30c9d2230e91ea12764daddb797b 100644 (file)
 
 from __future__ import absolute_import, print_function
 
-import functools
+import calendar
 import itertools
-import logging
+import re
 import time
 
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
+ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
+ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
 
 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
     hostname = arvados_node.get('hostname') or default_hostname
     return '{}.{}'.format(hostname, arvados_node['domain'])
 
 def arvados_node_mtime(node):
-    return time.mktime(time.strptime(node['modified_at'] + 'UTC',
-                                     '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+    return arvados_timestamp(node['modified_at'])
+
+def arvados_timestamp(timestr):
+    subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
+    if subsec_match is None:
+        subsecs = .0
+    else:
+        subsecs = float(subsec_match.group(1))
+        timestr = timestr[:subsec_match.start()] + 'Z'
+    return calendar.timegm(time.strptime(timestr + 'UTC',
+                                         ARVADOS_TIMEFMT + '%Z'))
 
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-def _retry(errors):
-    """Retry decorator for an actor method that makes remote requests.
-
-    Use this function to decorator an actor method, and pass in a tuple of
-    exceptions to catch.  This decorator will schedule retries of that method
-    with exponential backoff if the original method raises any of the given
-    errors.
-    """
-    def decorator(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            try:
-                orig_func(self, *args, **kwargs)
-            except errors as error:
-                self._logger.warning(
-                    "Client error: %s - waiting %s seconds",
-                    error, self.retry_wait)
-                self._timer.schedule(self.retry_wait,
-                                     getattr(self._later, orig_func.__name__),
-                                     *args, **kwargs)
-                self.retry_wait = min(self.retry_wait * 2,
-                                      self.max_retry_wait)
-            else:
-                self.retry_wait = self.min_retry_wait
-        return wrapper
-    return decorator
-
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
-
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
-    """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
-
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeSetupActor(config.actor_class):
-    """Actor to create and set up a cloud compute node.
-
-    This actor prepares an Arvados node record for a new compute node
-    (either creating one or cleaning one passed in), then boots the
-    actual compute node.  It notifies subscribers when the cloud node
-    is successfully created (the last step in the process for Node
-    Manager to handle).
-    """
-    def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node=None,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__()
-        self._timer = timer_actor
-        self._arvados = arvados_client
-        self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodeup')
-        self.cloud_size = cloud_size
-        self.subscribers = set()
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
-        self.arvados_node = None
-        self.cloud_node = None
-        if arvados_node is None:
-            self._later.create_arvados_node()
-        else:
-            self._later.prepare_arvados_node(arvados_node)
-
-    @_retry(config.ARVADOS_ERRORS)
-    def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
-        self._later.create_cloud_node()
-
-    @_retry(config.ARVADOS_ERRORS)
-    def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
-        self._later.create_cloud_node()
-
-    @_retry(config.CLOUD_ERRORS)
-    def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
-                          self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
-
-    def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
-
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
-
-class ComputeNodeShutdownActor(config.actor_class):
-    """Actor to shut down a compute node.
-
-    This actor simply destroys a cloud node, retrying as needed.
-    """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__()
-        self._timer = timer_actor
-        self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodedown')
-        self.cloud_node = cloud_node
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
-        self._later.shutdown_node()
-
-    @_retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-
-
-class ComputeNodeUpdateActor(config.actor_class):
-    """Actor to dispatch one-off cloud management requests.
-
-    This actor receives requests for small cloud updates, and
-    dispatches them to a real driver.  ComputeNodeMonitorActors use
-    this to perform maintenance tasks on themselves.  Having a
-    dedicated actor for this gives us the opportunity to control the
-    flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
-    """
-    def __init__(self, cloud_factory, max_retry_wait=180):
-        super(ComputeNodeUpdateActor, self).__init__()
-        self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
-
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return wrapper
-
-    @_throttle_errors
-    def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
-
-
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
@@ -297,87 +70,3 @@ class ShutdownTimer(object):
     def window_open(self):
         self._advance_opening()
         return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
-    """Actor to manage a running compute node.
-
-    This actor gets updates about a compute node's cloud and Arvados records.
-    It uses this information to notify subscribers when the node is eligible
-    for shutdown.
-    """
-    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
-        self._shutdowns = shutdown_timer
-        self._timer = timer_actor
-        self._update = update_actor
-        self.cloud_node = cloud_node
-        self.cloud_node_start_time = cloud_node_start_time
-        self.poll_stale_after = poll_stale_after
-        self.node_stale_after = node_stale_after
-        self.subscribers = set()
-        self.arvados_node = None
-        self._later.update_arvados_node(arvados_node)
-        self.last_shutdown_opening = None
-        self._later.consider_shutdown()
-
-    def subscribe(self, subscriber):
-        self.subscribers.add(subscriber)
-
-    def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
-        self._logger.debug(msg, *args)
-
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
-        else:
-            return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
-                                    self.poll_stale_after) and
-                    (self.arvados_node['info'].get('slurm_state') == 'idle'))
-
-    def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
-
-    def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
-            return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
-            self._later.update_arvados_node(arvados_node)
-            return self.cloud_node.id
-        else:
-            return None
-
-    def update_cloud_node(self, cloud_node):
-        if cloud_node is not None:
-            self.cloud_node = cloud_node
-            self._later.consider_shutdown()
-
-    def update_arvados_node(self, arvados_node):
-        if arvados_node is not None:
-            self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
-                self._update.sync_node(self.cloud_node, self.arvados_node)
-            self._later.consider_shutdown()