Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / services / nodemanager / arvnodeman / computenode / driver / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 from operator import attrgetter
10
11 import libcloud.common.types as cloud_types
12 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
13
14 from ...config import CLOUD_ERRORS
15 from ...status import tracker
16 from .. import RetryMixin
17
18 class BaseComputeNodeDriver(RetryMixin):
19     """Abstract base class for compute node drivers.
20
21     libcloud drivers abstract away many of the differences between
22     cloud providers, but managing compute nodes requires some
23     cloud-specific features (e.g., keeping track of node FQDNs and
24     boot times).  Compute node drivers are responsible for translating
25     the node manager's cloud requests to a specific cloud's
26     vocabulary.
27
28     Subclasses must implement arvados_create_kwargs, sync_node,
29     node_fqdn, and node_start_time.
30     """
31
32
33     @RetryMixin._retry()
34     def _create_driver(self, driver_class, **auth_kwargs):
35         return driver_class(**auth_kwargs)
36
37     @RetryMixin._retry()
38     def sizes(self):
39         if self._sizes is None:
40             self._sizes = {sz.id: sz for sz in self.real.list_sizes()}
41         return self._sizes
42
43     def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
44                  driver_class, retry_wait=1, max_retry_wait=180):
45         """Base initializer for compute node drivers.
46
47         Arguments:
48         * auth_kwargs: A dictionary of arguments that are passed into the
49           driver_class constructor to instantiate a libcloud driver.
50         * list_kwargs: A dictionary of arguments that are passed to the
51           libcloud driver's list_nodes method to return the list of compute
52           nodes.
53         * create_kwargs: A dictionary of arguments that are passed to the
54           libcloud driver's create_node method to create a new compute node.
55         * driver_class: The class of a libcloud driver to use.
56         """
57
58         super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
59                                          logging.getLogger(self.__class__.__name__),
60                                          type(self),
61                                          None)
62         self.real = self._create_driver(driver_class, **auth_kwargs)
63         self.list_kwargs = list_kwargs
64         self.create_kwargs = create_kwargs
65         # Transform entries in create_kwargs.  For each key K, if this class
66         # has an _init_K method, remove the entry and call _init_K with the
67         # corresponding value.  If _init_K returns None, the entry stays out
68         # of the dictionary (we expect we're holding the value somewhere
69         # else, like an instance variable).  Otherwise, _init_K returns a
70         # key-value tuple pair, and we add that entry to create_kwargs.
71         for key in self.create_kwargs.keys():
72             init_method = getattr(self, '_init_' + key, None)
73             if init_method is not None:
74                 new_pair = init_method(self.create_kwargs.pop(key))
75                 if new_pair is not None:
76                     self.create_kwargs[new_pair[0]] = new_pair[1]
77
78         self._sizes = None
79
80     def _init_ping_host(self, ping_host):
81         self.ping_host = ping_host
82
83     def _init_ssh_key(self, filename):
84         with open(filename) as ssh_file:
85             key = NodeAuthSSHKey(ssh_file.read())
86         return 'auth', key
87
88     def search_for_now(self, term, list_method, key=attrgetter('id'), **kwargs):
89         """Return one matching item from a list of cloud objects.
90
91         Raises ValueError if the number of matching objects is not exactly 1.
92
93         Arguments:
94         * term: The value that identifies a matching item.
95         * list_method: A string that names the method to call for a
96           list of objects.
97         * key: A function that accepts a cloud object and returns a
98           value search for a `term` match on each item.  Returns the
99           object's 'id' attribute by default.
100         """
101         try:
102             list_func = getattr(self, list_method)
103         except AttributeError:
104             list_func = getattr(self.real, list_method)
105         items = list_func(**kwargs)
106         results = [item for item in items if key(item) == term]
107         count = len(results)
108         if count != 1:
109             raise ValueError("{} returned {} results for {!r}".format(
110                     list_method, count, term))
111         return results[0]
112
113     def search_for(self, term, list_method, key=attrgetter('id'), **kwargs):
114         """Return one cached matching item from a list of cloud objects.
115
116         See search_for_now() for details of arguments and exceptions.
117         This method caches results, so it's good to find static cloud objects
118         like node sizes, regions, etc.
119         """
120         cache_key = (list_method, term)
121         if cache_key not in self.SEARCH_CACHE:
122             self.SEARCH_CACHE[cache_key] = self.search_for_now(
123                 term, list_method, key, **kwargs)
124         return self.SEARCH_CACHE[cache_key]
125
126     def list_nodes(self, **kwargs):
127         l = self.list_kwargs.copy()
128         l.update(kwargs)
129         try:
130             return self.real.list_nodes(**l)
131         except CLOUD_ERRORS:
132             tracker.counter_add('list_nodes_errors')
133             raise
134
135     def create_cloud_name(self, arvados_node):
136         """Return a cloud node name for the given Arvados node record.
137
138         Subclasses must override this method.  It should return a string
139         that can be used as the name for a newly-created cloud node,
140         based on identifying information in the Arvados node record.
141
142         Arguments:
143         * arvados_node: This Arvados node record to seed the new cloud node.
144         """
145         raise NotImplementedError("BaseComputeNodeDriver.create_cloud_name")
146
147     def arvados_create_kwargs(self, size, arvados_node):
148         """Return dynamic keyword arguments for create_node.
149
150         Subclasses must override this method.  It should return a dictionary
151         of keyword arguments to pass to the libcloud driver's create_node
152         method.  These arguments will extend the static arguments in
153         create_kwargs.
154
155         Arguments:
156         * size: The node size that will be created (libcloud NodeSize object)
157         * arvados_node: The Arvados node record that will be associated
158           with this cloud node, as returned from the API server.
159         """
160         raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
161
162     def broken(self, cloud_node):
163         """Return true if libcloud has indicated the node is in a "broken" state."""
164         return False
165
166     def _make_ping_url(self, arvados_node):
167         return 'https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.format(
168             self.ping_host, arvados_node['uuid'],
169             arvados_node['info']['ping_secret'])
170
171     @staticmethod
172     def _name_key(cloud_object):
173         return cloud_object.name
174
175     def create_node(self, size, arvados_node):
176         try:
177             kwargs = self.create_kwargs.copy()
178             kwargs.update(self.arvados_create_kwargs(size, arvados_node))
179             kwargs['size'] = size.real
180             return self.real.create_node(**kwargs)
181         except CLOUD_ERRORS as create_error:
182             # Workaround for bug #6702: sometimes the create node request
183             # succeeds but times out and raises an exception instead of
184             # returning a result.  If this happens, we get stuck in a retry
185             # loop forever because subsequent create_node attempts will fail
186             # due to node name collision.  So check if the node we intended to
187             # create shows up in the cloud node list and return it if found.
188             try:
189                 return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
190             except ValueError:
191                 tracker.counter_add('create_node_errors')
192                 raise create_error
193
194     def post_create_node(self, cloud_node):
195         # ComputeNodeSetupActor calls this method after the cloud node is
196         # created.  Any setup tasks that need to happen afterward (e.g.,
197         # tagging) should be done in this method.
198         pass
199
200     def sync_node(self, cloud_node, arvados_node):
201         # When a compute node first pings the API server, the API server
202         # will automatically assign some attributes on the corresponding
203         # node record, like hostname.  This method should propagate that
204         # information back to the cloud node appropriately.
205         raise NotImplementedError("BaseComputeNodeDriver.sync_node")
206
207     @classmethod
208     def node_fqdn(cls, node):
209         # This method should return the FQDN of the node object argument.
210         # Different clouds store this in different places.
211         raise NotImplementedError("BaseComputeNodeDriver.node_fqdn")
212
213     @classmethod
214     def node_start_time(cls, node):
215         # This method should return the time the node was started, in
216         # seconds since the epoch UTC.
217         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
218
219     def destroy_node(self, cloud_node):
220         try:
221             return self.real.destroy_node(cloud_node)
222         except CLOUD_ERRORS:
223             # Sometimes the destroy node request succeeds but times out and
224             # raises an exception instead of returning success.  If this
225             # happens, we get a noisy stack trace.  Check if the node is still
226             # on the node list.  If it is gone, we can declare victory.
227             try:
228                 self.search_for_now(cloud_node.id, 'list_nodes')
229             except ValueError:
230                 # If we catch ValueError, that means search_for_now didn't find
231                 # it, which means destroy_node actually succeeded.
232                 return True
233             # The node is still on the list.  Re-raise.
234             tracker.counter_add('destroy_node_errors')
235             raise
236
237     # Now that we've defined all our own methods, delegate generic, public
238     # attributes of libcloud drivers that we haven't defined ourselves.
239     def _delegate_to_real(attr_name):
240         return property(
241             lambda self: getattr(self.real, attr_name),
242             lambda self, value: setattr(self.real, attr_name, value),
243             doc=getattr(getattr(NodeDriver, attr_name), '__doc__', None))
244
245     # node id
246     @classmethod
247     def node_id(cls):
248         raise NotImplementedError("BaseComputeNodeDriver.node_id")
249
250     _locals = locals()
251     for _attr_name in dir(NodeDriver):
252         if (not _attr_name.startswith('_')) and (_attr_name not in _locals):
253             _locals[_attr_name] = _delegate_to_real(_attr_name)