Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / services / nodemanager / arvnodeman / computenode / driver / __init__.py
index 724c772733ae0ed1479e7b31d24238a87b591b1f..8f881b04e35341a738b19e1d233d0e851166c9db 100644 (file)
@@ -1,15 +1,20 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
 
 from __future__ import absolute_import, print_function
 
+import logging
 from operator import attrgetter
 
 import libcloud.common.types as cloud_types
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
 from operator import attrgetter
 
 import libcloud.common.types as cloud_types
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
-from ...config import NETWORK_ERRORS
+from ...config import CLOUD_ERRORS
+from .. import RetryMixin
 
 
-class BaseComputeNodeDriver(object):
+class BaseComputeNodeDriver(RetryMixin):
     """Abstract base class for compute node drivers.
 
     libcloud drivers abstract away many of the differences between
     """Abstract base class for compute node drivers.
 
     libcloud drivers abstract away many of the differences between
@@ -22,9 +27,18 @@ class BaseComputeNodeDriver(object):
     Subclasses must implement arvados_create_kwargs, sync_node,
     node_fqdn, and node_start_time.
     """
     Subclasses must implement arvados_create_kwargs, sync_node,
     node_fqdn, and node_start_time.
     """
-    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
 
 
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+
+    @RetryMixin._retry()
+    def _create_driver(self, driver_class, **auth_kwargs):
+        return driver_class(**auth_kwargs)
+
+    @RetryMixin._retry()
+    def _set_sizes(self):
+        self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class, retry_wait=1, max_retry_wait=180):
         """Base initializer for compute node drivers.
 
         Arguments:
         """Base initializer for compute node drivers.
 
         Arguments:
@@ -37,7 +51,12 @@ class BaseComputeNodeDriver(object):
           libcloud driver's create_node method to create a new compute node.
         * driver_class: The class of a libcloud driver to use.
         """
           libcloud driver's create_node method to create a new compute node.
         * driver_class: The class of a libcloud driver to use.
         """
-        self.real = driver_class(**auth_kwargs)
+
+        super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
+                                         logging.getLogger(self.__class__.__name__),
+                                         type(self),
+                                         None)
+        self.real = self._create_driver(driver_class, **auth_kwargs)
         self.list_kwargs = list_kwargs
         self.create_kwargs = create_kwargs
         # Transform entries in create_kwargs.  For each key K, if this class
         self.list_kwargs = list_kwargs
         self.create_kwargs = create_kwargs
         # Transform entries in create_kwargs.  For each key K, if this class
@@ -53,6 +72,8 @@ class BaseComputeNodeDriver(object):
                 if new_pair is not None:
                     self.create_kwargs[new_pair[0]] = new_pair[1]
 
                 if new_pair is not None:
                     self.create_kwargs[new_pair[0]] = new_pair[1]
 
+        self._set_sizes()
+
     def _init_ping_host(self, ping_host):
         self.ping_host = ping_host
 
     def _init_ping_host(self, ping_host):
         self.ping_host = ping_host
 
@@ -61,35 +82,62 @@ class BaseComputeNodeDriver(object):
             key = NodeAuthSSHKey(ssh_file.read())
         return 'auth', key
 
             key = NodeAuthSSHKey(ssh_file.read())
         return 'auth', key
 
-    def search_for(self, term, list_method, key=attrgetter('id'), **kwargs):
+    def search_for_now(self, term, list_method, key=attrgetter('id'), **kwargs):
         """Return one matching item from a list of cloud objects.
 
         Raises ValueError if the number of matching objects is not exactly 1.
 
         Arguments:
         * term: The value that identifies a matching item.
         """Return one matching item from a list of cloud objects.
 
         Raises ValueError if the number of matching objects is not exactly 1.
 
         Arguments:
         * term: The value that identifies a matching item.
-        * list_method: A string that names the method to call on this
-          instance's libcloud driver for a list of objects.
+        * list_method: A string that names the method to call for a
+          list of objects.
         * key: A function that accepts a cloud object and returns a
           value search for a `term` match on each item.  Returns the
           object's 'id' attribute by default.
         """
         * key: A function that accepts a cloud object and returns a
           value search for a `term` match on each item.  Returns the
           object's 'id' attribute by default.
         """
+        try:
+            list_func = getattr(self, list_method)
+        except AttributeError:
+            list_func = getattr(self.real, list_method)
+        items = list_func(**kwargs)
+        results = [item for item in items if key(item) == term]
+        count = len(results)
+        if count != 1:
+            raise ValueError("{} returned {} results for {!r}".format(
+                    list_method, count, term))
+        return results[0]
+
+    def search_for(self, term, list_method, key=attrgetter('id'), **kwargs):
+        """Return one cached matching item from a list of cloud objects.
+
+        See search_for_now() for details of arguments and exceptions.
+        This method caches results, so it's good to find static cloud objects
+        like node sizes, regions, etc.
+        """
         cache_key = (list_method, term)
         if cache_key not in self.SEARCH_CACHE:
         cache_key = (list_method, term)
         if cache_key not in self.SEARCH_CACHE:
-            items = getattr(self.real, list_method)(**kwargs)
-            results = [item for item in items
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
+            self.SEARCH_CACHE[cache_key] = self.search_for_now(
+                term, list_method, key, **kwargs)
         return self.SEARCH_CACHE[cache_key]
 
         return self.SEARCH_CACHE[cache_key]
 
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
+    def list_nodes(self, **kwargs):
+        l = self.list_kwargs.copy()
+        l.update(kwargs)
+        return self.real.list_nodes(**l)
 
 
-    def arvados_create_kwargs(self, arvados_node):
+    def create_cloud_name(self, arvados_node):
+        """Return a cloud node name for the given Arvados node record.
+
+        Subclasses must override this method.  It should return a string
+        that can be used as the name for a newly-created cloud node,
+        based on identifying information in the Arvados node record.
+
+        Arguments:
+        * arvados_node: This Arvados node record to seed the new cloud node.
+        """
+        raise NotImplementedError("BaseComputeNodeDriver.create_cloud_name")
+
+    def arvados_create_kwargs(self, size, arvados_node):
         """Return dynamic keyword arguments for create_node.
 
         Subclasses must override this method.  It should return a dictionary
         """Return dynamic keyword arguments for create_node.
 
         Subclasses must override this method.  It should return a dictionary
@@ -98,21 +146,42 @@ class BaseComputeNodeDriver(object):
         create_kwargs.
 
         Arguments:
         create_kwargs.
 
         Arguments:
+        * size: The node size that will be created (libcloud NodeSize object)
         * arvados_node: The Arvados node record that will be associated
           with this cloud node, as returned from the API server.
         """
         raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
 
         * arvados_node: The Arvados node record that will be associated
           with this cloud node, as returned from the API server.
         """
         raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
 
+    def broken(self, cloud_node):
+        """Return true if libcloud has indicated the node is in a "broken" state."""
+        return False
+
     def _make_ping_url(self, arvados_node):
         return 'https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.format(
             self.ping_host, arvados_node['uuid'],
             arvados_node['info']['ping_secret'])
 
     def _make_ping_url(self, arvados_node):
         return 'https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.format(
             self.ping_host, arvados_node['uuid'],
             arvados_node['info']['ping_secret'])
 
+    @staticmethod
+    def _name_key(cloud_object):
+        return cloud_object.name
+
     def create_node(self, size, arvados_node):
     def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
+        try:
+            kwargs = self.create_kwargs.copy()
+            kwargs.update(self.arvados_create_kwargs(size, arvados_node))
+            kwargs['size'] = size
+            return self.real.create_node(**kwargs)
+        except CLOUD_ERRORS as create_error:
+            # Workaround for bug #6702: sometimes the create node request
+            # succeeds but times out and raises an exception instead of
+            # returning a result.  If this happens, we get stuck in a retry
+            # loop forever because subsequent create_node attempts will fail
+            # due to node name collision.  So check if the node we intended to
+            # create shows up in the cloud node list and return it if found.
+            try:
+                return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
+            except ValueError:
+                raise create_error
 
     def post_create_node(self, cloud_node):
         # ComputeNodeSetupActor calls this method after the cloud node is
 
     def post_create_node(self, cloud_node):
         # ComputeNodeSetupActor calls this method after the cloud node is
@@ -139,13 +208,22 @@ class BaseComputeNodeDriver(object):
         # seconds since the epoch UTC.
         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
 
         # seconds since the epoch UTC.
         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
 
-    @classmethod
-    def is_cloud_exception(cls, exception):
-        # libcloud compute drivers typically raise bare Exceptions to
-        # represent API errors.  Return True for any exception that is
-        # exactly an Exception, or a better-known higher-level exception.
-        return (isinstance(exception, cls.CLOUD_ERRORS) or
-                type(exception) is Exception)
+    def destroy_node(self, cloud_node):
+        try:
+            return self.real.destroy_node(cloud_node)
+        except CLOUD_ERRORS as destroy_error:
+            # Sometimes the destroy node request succeeds but times out and
+            # raises an exception instead of returning success.  If this
+            # happens, we get a noisy stack trace.  Check if the node is still
+            # on the node list.  If it is gone, we can declare victory.
+            try:
+                self.search_for_now(cloud_node.id, 'list_nodes')
+            except ValueError:
+                # If we catch ValueError, that means search_for_now didn't find
+                # it, which means destroy_node actually succeeded.
+                return True
+            # The node is still on the list.  Re-raise.
+            raise
 
     # Now that we've defined all our own methods, delegate generic, public
     # attributes of libcloud drivers that we haven't defined ourselves.
 
     # Now that we've defined all our own methods, delegate generic, public
     # attributes of libcloud drivers that we haven't defined ourselves.
@@ -155,6 +233,11 @@ class BaseComputeNodeDriver(object):
             lambda self, value: setattr(self.real, attr_name, value),
             doc=getattr(getattr(NodeDriver, attr_name), '__doc__', None))
 
             lambda self, value: setattr(self.real, attr_name, value),
             doc=getattr(getattr(NodeDriver, attr_name), '__doc__', None))
 
+    # node id
+    @classmethod
+    def node_id(cls):
+        raise NotImplementedError("BaseComputeNodeDriver.node_id")
+
     _locals = locals()
     for _attr_name in dir(NodeDriver):
         if (not _attr_name.startswith('_')) and (_attr_name not in _locals):
     _locals = locals()
     for _attr_name in dir(NodeDriver):
         if (not _attr_name.startswith('_')) and (_attr_name not in _locals):