3826: Merge branch 'master' into 3826-crunchstat-netstats
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import itertools
7 import logging
8 import time
9
10 import pykka
11
12 from ..clientactor import _notify_subscribers
13 from .. import config
14
15 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
16     hostname = arvados_node.get('hostname') or default_hostname
17     return '{}.{}'.format(hostname, arvados_node['domain'])
18
19 def arvados_node_mtime(node):
20     return time.mktime(time.strptime(node['modified_at'] + 'UTC',
21                                      '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
22
23 def timestamp_fresh(timestamp, fresh_time):
24     return (time.time() - timestamp) < fresh_time
25
26 class BaseComputeNodeDriver(object):
27     """Abstract base class for compute node drivers.
28
29     libcloud abstracts away many of the differences between cloud providers,
30     but managing compute nodes requires some cloud-specific features (e.g.,
31     on EC2 we use tags to identify compute nodes).  Compute node drivers
32     are responsible for translating the node manager's cloud requests to a
33     specific cloud's vocabulary.
34
35     Subclasses must implement arvados_create_kwargs (to update node
36     creation kwargs with information about the specific Arvados node
37     record), sync_node, and node_start_time.
38     """
39     def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
40         self.real = driver_class(**auth_kwargs)
41         self.list_kwargs = list_kwargs
42         self.create_kwargs = create_kwargs
43
44     def __getattr__(self, name):
45         # Proxy non-extension methods to the real driver.
46         if (not name.startswith('_') and not name.startswith('ex_')
47               and hasattr(self.real, name)):
48             return getattr(self.real, name)
49         else:
50             return super(BaseComputeNodeDriver, self).__getattr__(name)
51
52     def search_for(self, term, list_method, key=lambda item: item.id):
53         cache_key = (list_method, term)
54         if cache_key not in self.SEARCH_CACHE:
55             results = [item for item in getattr(self.real, list_method)()
56                        if key(item) == term]
57             count = len(results)
58             if count != 1:
59                 raise ValueError("{} returned {} results for '{}'".format(
60                         list_method, count, term))
61             self.SEARCH_CACHE[cache_key] = results[0]
62         return self.SEARCH_CACHE[cache_key]
63
64     def list_nodes(self):
65         return self.real.list_nodes(**self.list_kwargs)
66
67     def arvados_create_kwargs(self, arvados_node):
68         raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
69
70     def create_node(self, size, arvados_node):
71         kwargs = self.create_kwargs.copy()
72         kwargs.update(self.arvados_create_kwargs(arvados_node))
73         kwargs['size'] = size
74         return self.real.create_node(**kwargs)
75
76     def sync_node(self, cloud_node, arvados_node):
77         # When a compute node first pings the API server, the API server
78         # will automatically assign some attributes on the corresponding
79         # node record, like hostname.  This method should propagate that
80         # information back to the cloud node appropriately.
81         raise NotImplementedError("BaseComputeNodeDriver.sync_node")
82
83     @classmethod
84     def node_start_time(cls, node):
85         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
86
87
88 ComputeNodeDriverClass = BaseComputeNodeDriver
89
90 class ComputeNodeStateChangeBase(config.actor_class):
91     """Base class for actors that change a compute node's state.
92
93     This base class takes care of retrying changes and notifying
94     subscribers when the change is finished.
95     """
96     def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
97         super(ComputeNodeStateChangeBase, self).__init__()
98         self._later = self.actor_ref.proxy()
99         self._timer = timer_actor
100         self._logger = logging.getLogger(logger_name)
101         self.min_retry_wait = retry_wait
102         self.max_retry_wait = max_retry_wait
103         self.retry_wait = retry_wait
104         self.subscribers = set()
105
106     @staticmethod
107     def _retry(errors):
108         """Retry decorator for an actor method that makes remote requests.
109
110         Use this function to decorator an actor method, and pass in a
111         tuple of exceptions to catch.  This decorator will schedule
112         retries of that method with exponential backoff if the
113         original method raises any of the given errors.
114         """
115         def decorator(orig_func):
116             @functools.wraps(orig_func)
117             def wrapper(self, *args, **kwargs):
118                 try:
119                     orig_func(self, *args, **kwargs)
120                 except errors as error:
121                     self._logger.warning(
122                         "Client error: %s - waiting %s seconds",
123                         error, self.retry_wait)
124                     self._timer.schedule(self.retry_wait,
125                                          getattr(self._later,
126                                                  orig_func.__name__),
127                                          *args, **kwargs)
128                     self.retry_wait = min(self.retry_wait * 2,
129                                           self.max_retry_wait)
130                 else:
131                     self.retry_wait = self.min_retry_wait
132             return wrapper
133         return decorator
134
135     def _finished(self):
136         _notify_subscribers(self._later, self.subscribers)
137         self.subscribers = None
138
139     def subscribe(self, subscriber):
140         if self.subscribers is None:
141             try:
142                 subscriber(self._later)
143             except pykka.ActorDeadError:
144                 pass
145         else:
146             self.subscribers.add(subscriber)
147
148
149 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
150     """Actor to create and set up a cloud compute node.
151
152     This actor prepares an Arvados node record for a new compute node
153     (either creating one or cleaning one passed in), then boots the
154     actual compute node.  It notifies subscribers when the cloud node
155     is successfully created (the last step in the process for Node
156     Manager to handle).
157     """
158     def __init__(self, timer_actor, arvados_client, cloud_client,
159                  cloud_size, arvados_node=None,
160                  retry_wait=1, max_retry_wait=180):
161         super(ComputeNodeSetupActor, self).__init__(
162             'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
163         self._arvados = arvados_client
164         self._cloud = cloud_client
165         self.cloud_size = cloud_size
166         self.arvados_node = None
167         self.cloud_node = None
168         if arvados_node is None:
169             self._later.create_arvados_node()
170         else:
171             self._later.prepare_arvados_node(arvados_node)
172
173     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
174     def create_arvados_node(self):
175         self.arvados_node = self._arvados.nodes().create(body={}).execute()
176         self._later.create_cloud_node()
177
178     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
179     def prepare_arvados_node(self, node):
180         self.arvados_node = self._arvados.nodes().update(
181             uuid=node['uuid'],
182             body={'hostname': None,
183                   'ip_address': None,
184                   'slot_number': None,
185                   'first_ping_at': None,
186                   'last_ping_at': None,
187                   'info': {'ec2_instance_id': None,
188                            'last_action': "Prepared by Node Manager"}}
189             ).execute()
190         self._later.create_cloud_node()
191
192     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
193     def create_cloud_node(self):
194         self._logger.info("Creating cloud node with size %s.",
195                           self.cloud_size.name)
196         self.cloud_node = self._cloud.create_node(self.cloud_size,
197                                                   self.arvados_node)
198         self._logger.info("Cloud node %s created.", self.cloud_node.id)
199         self._finished()
200
201     def stop_if_no_cloud_node(self):
202         if self.cloud_node is None:
203             self.stop()
204
205
206 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
207     """Actor to shut down a compute node.
208
209     This actor simply destroys a cloud node, retrying as needed.
210     """
211     def __init__(self, timer_actor, cloud_client, cloud_node,
212                  retry_wait=1, max_retry_wait=180):
213         super(ComputeNodeShutdownActor, self).__init__(
214             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
215         self._cloud = cloud_client
216         self.cloud_node = cloud_node
217         self._later.shutdown_node()
218
219     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
220     def shutdown_node(self):
221         self._cloud.destroy_node(self.cloud_node)
222         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
223         self._finished()
224
225
226 class ComputeNodeUpdateActor(config.actor_class):
227     """Actor to dispatch one-off cloud management requests.
228
229     This actor receives requests for small cloud updates, and
230     dispatches them to a real driver.  ComputeNodeMonitorActors use
231     this to perform maintenance tasks on themselves.  Having a
232     dedicated actor for this gives us the opportunity to control the
233     flow of requests; e.g., by backing off when errors occur.
234
235     This actor is most like a "traditional" Pykka actor: there's no
236     subscribing, but instead methods return real driver results.  If
237     you're interested in those results, you should get them from the
238     Future that the proxy method returns.  Be prepared to handle exceptions
239     from the cloud driver when you do.
240     """
241     def __init__(self, cloud_factory, max_retry_wait=180):
242         super(ComputeNodeUpdateActor, self).__init__()
243         self._cloud = cloud_factory()
244         self.max_retry_wait = max_retry_wait
245         self.error_streak = 0
246         self.next_request_time = time.time()
247
248     def _throttle_errors(orig_func):
249         @functools.wraps(orig_func)
250         def wrapper(self, *args, **kwargs):
251             throttle_time = self.next_request_time - time.time()
252             if throttle_time > 0:
253                 time.sleep(throttle_time)
254             self.next_request_time = time.time()
255             try:
256                 result = orig_func(self, *args, **kwargs)
257             except config.CLOUD_ERRORS:
258                 self.error_streak += 1
259                 self.next_request_time += min(2 ** self.error_streak,
260                                               self.max_retry_wait)
261                 raise
262             else:
263                 self.error_streak = 0
264                 return result
265         return wrapper
266
267     @_throttle_errors
268     def sync_node(self, cloud_node, arvados_node):
269         return self._cloud.sync_node(cloud_node, arvados_node)
270
271
272 class ShutdownTimer(object):
273     """Keep track of a cloud node's shutdown windows.
274
275     Instantiate this class with a timestamp of when a cloud node started,
276     and a list of durations (in minutes) of when the node must not and may
277     be shut down, alternating.  The class will tell you when a shutdown
278     window is open, and when the next open window will start.
279     """
280     def __init__(self, start_time, shutdown_windows):
281         # The implementation is easiest if we have an even number of windows,
282         # because then windows always alternate between open and closed.
283         # Rig that up: calculate the first shutdown window based on what's
284         # passed in.  Then, if we were given an odd number of windows, merge
285         # that first window into the last one, since they both# represent
286         # closed state.
287         first_window = shutdown_windows[0]
288         shutdown_windows = list(shutdown_windows[1:])
289         self._next_opening = start_time + (60 * first_window)
290         if len(shutdown_windows) % 2:
291             shutdown_windows.append(first_window)
292         else:
293             shutdown_windows[-1] += first_window
294         self.shutdown_windows = itertools.cycle([60 * n
295                                                  for n in shutdown_windows])
296         self._open_start = self._next_opening
297         self._open_for = next(self.shutdown_windows)
298
299     def _advance_opening(self):
300         while self._next_opening < time.time():
301             self._open_start = self._next_opening
302             self._next_opening += self._open_for + next(self.shutdown_windows)
303             self._open_for = next(self.shutdown_windows)
304
305     def next_opening(self):
306         self._advance_opening()
307         return self._next_opening
308
309     def window_open(self):
310         self._advance_opening()
311         return 0 < (time.time() - self._open_start) < self._open_for
312
313
314 class ComputeNodeMonitorActor(config.actor_class):
315     """Actor to manage a running compute node.
316
317     This actor gets updates about a compute node's cloud and Arvados records.
318     It uses this information to notify subscribers when the node is eligible
319     for shutdown.
320     """
321     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
322                  timer_actor, update_actor, arvados_node=None,
323                  poll_stale_after=600, node_stale_after=3600):
324         super(ComputeNodeMonitorActor, self).__init__()
325         self._later = self.actor_ref.proxy()
326         self._logger = logging.getLogger('arvnodeman.computenode')
327         self._last_log = None
328         self._shutdowns = shutdown_timer
329         self._timer = timer_actor
330         self._update = update_actor
331         self.cloud_node = cloud_node
332         self.cloud_node_start_time = cloud_node_start_time
333         self.poll_stale_after = poll_stale_after
334         self.node_stale_after = node_stale_after
335         self.subscribers = set()
336         self.arvados_node = None
337         self._later.update_arvados_node(arvados_node)
338         self.last_shutdown_opening = None
339         self._later.consider_shutdown()
340
341     def subscribe(self, subscriber):
342         self.subscribers.add(subscriber)
343
344     def _debug(self, msg, *args):
345         if msg == self._last_log:
346             return
347         self._last_log = msg
348         self._logger.debug(msg, *args)
349
350     def _shutdown_eligible(self):
351         if self.arvados_node is None:
352             return timestamp_fresh(self.cloud_node_start_time,
353                                    self.node_stale_after)
354         else:
355             return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
356                                     self.poll_stale_after) and
357                     (self.arvados_node['info'].get('slurm_state') == 'idle'))
358
359     def consider_shutdown(self):
360         next_opening = self._shutdowns.next_opening()
361         if self._shutdowns.window_open():
362             if self._shutdown_eligible():
363                 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
364                 _notify_subscribers(self._later, self.subscribers)
365             else:
366                 self._debug("Node %s shutdown window open but node busy.",
367                             self.cloud_node.id)
368         else:
369             self._debug("Node %s shutdown window closed.  Next at %s.",
370                         self.cloud_node.id, time.ctime(next_opening))
371         if self.last_shutdown_opening != next_opening:
372             self._timer.schedule(next_opening, self._later.consider_shutdown)
373             self.last_shutdown_opening = next_opening
374
375     def offer_arvados_pair(self, arvados_node):
376         if self.arvados_node is not None:
377             return None
378         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
379             self._later.update_arvados_node(arvados_node)
380             return self.cloud_node.id
381         else:
382             return None
383
384     def update_cloud_node(self, cloud_node):
385         if cloud_node is not None:
386             self.cloud_node = cloud_node
387             self._later.consider_shutdown()
388
389     def update_arvados_node(self, arvados_node):
390         if arvados_node is not None:
391             self.arvados_node = arvados_node
392             new_hostname = arvados_node_fqdn(self.arvados_node)
393             if new_hostname != self.cloud_node.name:
394                 self._update.sync_node(self.cloud_node, self.arvados_node)
395             self._later.consider_shutdown()