Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
index 63effe9d37bd1150380c5723f02ca15db7028c0b..8a4e5f312b505bfa22cc03cb09b4be6198a04bf0 100644 (file)
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
+import calendar
 import functools
 import itertools
-import logging
+import re
 import time
 
-import pykka
+from ..config import CLOUD_ERRORS
+from libcloud.common.exceptions import BaseHTTPError
 
-from ..clientactor import _notify_subscribers
-from .. import config
+ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
+ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
 
 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
     hostname = arvados_node.get('hostname') or default_hostname
     return '{}.{}'.format(hostname, arvados_node['domain'])
 
 def arvados_node_mtime(node):
-    return time.mktime(time.strptime(node['modified_at'] + 'UTC',
-                                     '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+    return arvados_timestamp(node['modified_at'])
+
+def arvados_timestamp(timestr):
+    subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
+    if subsec_match is None:
+        subsecs = .0
+    else:
+        subsecs = float(subsec_match.group(1))
+        timestr = timestr[:subsec_match.start()] + 'Z'
+    return calendar.timegm(time.strptime(timestr + 'UTC',
+                                         ARVADOS_TIMEFMT + '%Z'))
 
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
+def arvados_node_missing(arvados_node, fresh_time):
+    """Indicate if cloud node corresponding to the arvados
+    node is "missing".
 
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
+    If True, this means the node has not pinged the API server within the timeout
+    period.  If False, the ping is up to date.  If the node has never pinged,
+    returns None.
     """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
+    if arvados_node["last_ping_at"] is None:
+        return None
+    else:
+        return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
 
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+class RetryMixin(object):
+    """Retry decorator for an method that makes remote requests.
 
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+    Use this function to decorate method, and pass in a tuple of exceptions to
+    catch.  If the original method raises a known cloud driver error, or any of
+    the given exception types, this decorator will either go into a
+    sleep-and-retry loop with exponential backoff either by sleeping (if
+    self._timer is None) or by scheduling retries of the method (if self._timer
+    is a timer actor.)
 
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
-    """Base class for actors that change a compute node's state.
-
-    This base class takes care of retrying changes and notifying
-    subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
-        super(ComputeNodeStateChangeBase, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
-        self._logger = logging.getLogger(logger_name)
+    def __init__(self, retry_wait, max_retry_wait,
+                 logger, cloud, timer=None):
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
-        self.subscribers = set()
+        self._logger = logger
+        self._cloud = cloud
+        self._timer = timer
 
     @staticmethod
-    def _retry(errors):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises any of the given errors.
-        """
+    def _retry(errors=()):
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
-                try:
-                    orig_func(self, *args, **kwargs)
-                except errors as error:
+            def retry_wrapper(self, *args, **kwargs):
+                while True:
+                    should_retry = False
+                    try:
+                        ret = orig_func(self, *args, **kwargs)
+                    except BaseHTTPError as error:
+                        if error.headers and error.headers.get("retry-after"):
+                            try:
+                                self.retry_wait = int(error.headers["retry-after"])
+                                if self.retry_wait < 0 or self.retry_wait > self.max_retry_wait:
+                                    self.retry_wait = self.max_retry_wait
+                                should_retry = True
+                            except ValueError:
+                                pass
+                        if error.code == 429 or error.code >= 500:
+                            should_retry = True
+                    except CLOUD_ERRORS as error:
+                        should_retry = True
+                    except errors as error:
+                        should_retry = True
+                    except Exception as error:
+                        # As a libcloud workaround for drivers that don't use
+                        # typed exceptions, consider bare Exception() objects
+                        # retryable.
+                        should_retry = type(error) is Exception
+                    else:
+                        # No exception,
+                        self.retry_wait = self.min_retry_wait
+                        return ret
+
+                    # Only got here if an exception was caught.  Now determine what to do about it.
+                    if not should_retry:
+                        self.retry_wait = self.min_retry_wait
+                        self._logger.warning(
+                            "Re-raising error (no retry): %s",
+                            error, exc_info=error)
+                        raise
+
                     self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
+                        "Client error: %s - %s %s seconds",
+                        error,
+                        "scheduling retry in" if self._timer else "sleeping",
+                        self.retry_wait,
+                        exc_info=error)
+
+                    if self._timer:
+                        start_time = time.time()
+                        # reschedule to be called again
+                        self._timer.schedule(start_time + self.retry_wait,
+                                             getattr(self._later,
+                                                     orig_func.__name__),
+                                             *args, **kwargs)
+                    else:
+                        # sleep on it.
+                        time.sleep(self.retry_wait)
+
                     self.retry_wait = min(self.retry_wait * 2,
                                           self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return wrapper
-        return decorator
-
-    def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
-
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
-    """Actor to create and set up a cloud compute node.
-
-    This actor prepares an Arvados node record for a new compute node
-    (either creating one or cleaning one passed in), then boots the
-    actual compute node.  It notifies subscribers when the cloud node
-    is successfully created (the last step in the process for Node
-    Manager to handle).
-    """
-    def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node=None,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
-        self._arvados = arvados_client
-        self._cloud = cloud_client
-        self.cloud_size = cloud_size
-        self.arvados_node = None
-        self.cloud_node = None
-        if arvados_node is None:
-            self._later.create_arvados_node()
-        else:
-            self._later.prepare_arvados_node(arvados_node)
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
-                          self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        self._finished()
-
-    def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
-    """Actor to shut down a compute node.
-
-    This actor simply destroys a cloud node, retrying as needed.
-    """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
-        self._later.shutdown_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
-    """Actor to dispatch one-off cloud management requests.
-
-    This actor receives requests for small cloud updates, and
-    dispatches them to a real driver.  ComputeNodeMonitorActors use
-    this to perform maintenance tasks on themselves.  Having a
-    dedicated actor for this gives us the opportunity to control the
-    flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
-    """
-    def __init__(self, cloud_factory, max_retry_wait=180):
-        super(ComputeNodeUpdateActor, self).__init__()
-        self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
-
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return wrapper
-
-    @_throttle_errors
-    def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
+                    if self._timer:
+                        # expect to be called again by timer so don't loop
+                        return
 
+            return retry_wrapper
+        return decorator
 
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
@@ -309,103 +179,3 @@ class ShutdownTimer(object):
     def window_open(self):
         self._advance_opening()
         return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
-    """Actor to manage a running compute node.
-
-    This actor gets updates about a compute node's cloud and Arvados records.
-    It uses this information to notify subscribers when the node is eligible
-    for shutdown.
-    """
-    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
-        self._shutdowns = shutdown_timer
-        self._timer = timer_actor
-        self._update = update_actor
-        self.cloud_node = cloud_node
-        self.cloud_node_start_time = cloud_node_start_time
-        self.poll_stale_after = poll_stale_after
-        self.node_stale_after = node_stale_after
-        self.subscribers = set()
-        self.arvados_node = None
-        self._later.update_arvados_node(arvados_node)
-        self.last_shutdown_opening = None
-        self._later.consider_shutdown()
-
-    def subscribe(self, subscriber):
-        self.subscribers.add(subscriber)
-
-    def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
-        self._logger.debug(msg, *args)
-
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
-        state = self.arvados_node['info'].get('slurm_state')
-        if not state:
-            return None
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
-
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
-            # If this is a new, unpaired node, it's eligible for
-            # shutdown--we figure there was an error during bootstrap.
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
-        else:
-            return self.in_state('idle')
-
-    def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
-
-    def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
-            return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
-            self._later.update_arvados_node(arvados_node)
-            return self.cloud_node.id
-        else:
-            return None
-
-    def update_cloud_node(self, cloud_node):
-        if cloud_node is not None:
-            self.cloud_node = cloud_node
-            self._later.consider_shutdown()
-
-    def update_arvados_node(self, arvados_node):
-        if arvados_node is not None:
-            self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
-                self._update.sync_node(self.cloud_node, self.arvados_node)
-            self._later.consider_shutdown()