Merge branch 'master' into wtsi-hgi-feature/arv-view
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._arvados = arvados_client
30         self._timer = timer_actor
31         self.min_retry_wait = retry_wait
32         self.max_retry_wait = max_retry_wait
33         self.retry_wait = retry_wait
34         self.subscribers = set()
35
36     @staticmethod
37     def _retry(errors=()):
38         """Retry decorator for an actor method that makes remote requests.
39
40         Use this function to decorator an actor method, and pass in a
41         tuple of exceptions to catch.  This decorator will schedule
42         retries of that method with exponential backoff if the
43         original method raises a known cloud driver error, or any of the
44         given exception types.
45         """
46         def decorator(orig_func):
47             @functools.wraps(orig_func)
48             def retry_wrapper(self, *args, **kwargs):
49                 start_time = time.time()
50                 try:
51                     orig_func(self, *args, **kwargs)
52                 except Exception as error:
53                     if not (isinstance(error, errors) or
54                             self._cloud.is_cloud_exception(error)):
55                         raise
56                     self._logger.warning(
57                         "Client error: %s - waiting %s seconds",
58                         error, self.retry_wait)
59                     self._timer.schedule(start_time + self.retry_wait,
60                                          getattr(self._later,
61                                                  orig_func.__name__),
62                                          *args, **kwargs)
63                     self.retry_wait = min(self.retry_wait * 2,
64                                           self.max_retry_wait)
65                 else:
66                     self.retry_wait = self.min_retry_wait
67             return retry_wrapper
68         return decorator
69
70     def _finished(self):
71         _notify_subscribers(self._later, self.subscribers)
72         self.subscribers = None
73
74     def subscribe(self, subscriber):
75         if self.subscribers is None:
76             try:
77                 subscriber(self._later)
78             except pykka.ActorDeadError:
79                 pass
80         else:
81             self.subscribers.add(subscriber)
82
83     def _clean_arvados_node(self, arvados_node, explanation):
84         return self._arvados.nodes().update(
85             uuid=arvados_node['uuid'],
86             body={'hostname': None,
87                   'ip_address': None,
88                   'slot_number': None,
89                   'first_ping_at': None,
90                   'last_ping_at': None,
91                   'info': {'ec2_instance_id': None,
92                            'last_action': explanation}},
93             ).execute()
94
95
96 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
97     """Actor to create and set up a cloud compute node.
98
99     This actor prepares an Arvados node record for a new compute node
100     (either creating one or cleaning one passed in), then boots the
101     actual compute node.  It notifies subscribers when the cloud node
102     is successfully created (the last step in the process for Node
103     Manager to handle).
104     """
105     def __init__(self, timer_actor, arvados_client, cloud_client,
106                  cloud_size, arvados_node=None,
107                  retry_wait=1, max_retry_wait=180):
108         super(ComputeNodeSetupActor, self).__init__(
109             'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
110             retry_wait, max_retry_wait)
111         self.cloud_size = cloud_size
112         self.arvados_node = None
113         self.cloud_node = None
114         if arvados_node is None:
115             self._later.create_arvados_node()
116         else:
117             self._later.prepare_arvados_node(arvados_node)
118
119     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
120     def create_arvados_node(self):
121         self.arvados_node = self._arvados.nodes().create(body={}).execute()
122         self._later.create_cloud_node()
123
124     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
125     def prepare_arvados_node(self, node):
126         self.arvados_node = self._clean_arvados_node(
127             node, "Prepared by Node Manager")
128         self._later.create_cloud_node()
129
130     @ComputeNodeStateChangeBase._retry()
131     def create_cloud_node(self):
132         self._logger.info("Creating cloud node with size %s.",
133                           self.cloud_size.name)
134         self.cloud_node = self._cloud.create_node(self.cloud_size,
135                                                   self.arvados_node)
136         self._logger.info("Cloud node %s created.", self.cloud_node.id)
137         self._later.post_create()
138
139     @ComputeNodeStateChangeBase._retry()
140     def post_create(self):
141         self._cloud.post_create_node(self.cloud_node)
142         self._logger.info("%s post-create work done.", self.cloud_node.id)
143         self._finished()
144
145     def stop_if_no_cloud_node(self):
146         if self.cloud_node is not None:
147             return False
148         self.stop()
149         return True
150
151
152 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
153     """Actor to shut down a compute node.
154
155     This actor simply destroys a cloud node, retrying as needed.
156     """
157     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
158                  cancellable=True, retry_wait=1, max_retry_wait=180):
159         # If a ShutdownActor is cancellable, it will ask the
160         # ComputeNodeMonitorActor if it's still eligible before taking each
161         # action, and stop the shutdown process if the node is no longer
162         # eligible.  Normal shutdowns based on job demand should be
163         # cancellable; shutdowns based on node misbehavior should not.
164         super(ComputeNodeShutdownActor, self).__init__(
165             'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
166             retry_wait, max_retry_wait)
167         self._monitor = node_monitor.proxy()
168         self.cloud_node = self._monitor.cloud_node.get()
169         self.cancellable = cancellable
170         self.success = None
171
172     def on_start(self):
173         self._later.shutdown_node()
174
175     def _arvados_node(self):
176         return self._monitor.arvados_node.get()
177
178     def _finished(self, success_flag=None):
179         if success_flag is not None:
180             self.success = success_flag
181         return super(ComputeNodeShutdownActor, self)._finished()
182
183     def cancel_shutdown(self):
184         self._finished(success_flag=False)
185
186     def _stop_if_window_closed(orig_func):
187         @functools.wraps(orig_func)
188         def stop_wrapper(self, *args, **kwargs):
189             if (self.cancellable and
190                   (not self._monitor.shutdown_eligible().get())):
191                 self._logger.info(
192                     "Cloud node %s shutdown cancelled - no longer eligible.",
193                     self.cloud_node.id)
194                 self._later.cancel_shutdown()
195                 return None
196             else:
197                 return orig_func(self, *args, **kwargs)
198         return stop_wrapper
199
200     @_stop_if_window_closed
201     @ComputeNodeStateChangeBase._retry()
202     def shutdown_node(self):
203         if not self._cloud.destroy_node(self.cloud_node):
204             # Force a retry.
205             raise cloud_types.LibcloudError("destroy_node failed")
206         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
207         arv_node = self._arvados_node()
208         if arv_node is None:
209             self._finished(success_flag=True)
210         else:
211             self._later.clean_arvados_node(arv_node)
212
213     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
214     def clean_arvados_node(self, arvados_node):
215         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
216         self._finished(success_flag=True)
217
218     # Make the decorator available to subclasses.
219     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
220
221
222 class ComputeNodeUpdateActor(config.actor_class):
223     """Actor to dispatch one-off cloud management requests.
224
225     This actor receives requests for small cloud updates, and
226     dispatches them to a real driver.  ComputeNodeMonitorActors use
227     this to perform maintenance tasks on themselves.  Having a
228     dedicated actor for this gives us the opportunity to control the
229     flow of requests; e.g., by backing off when errors occur.
230
231     This actor is most like a "traditional" Pykka actor: there's no
232     subscribing, but instead methods return real driver results.  If
233     you're interested in those results, you should get them from the
234     Future that the proxy method returns.  Be prepared to handle exceptions
235     from the cloud driver when you do.
236     """
237     def __init__(self, cloud_factory, max_retry_wait=180):
238         super(ComputeNodeUpdateActor, self).__init__()
239         self._cloud = cloud_factory()
240         self.max_retry_wait = max_retry_wait
241         self.error_streak = 0
242         self.next_request_time = time.time()
243
244     def _throttle_errors(orig_func):
245         @functools.wraps(orig_func)
246         def throttle_wrapper(self, *args, **kwargs):
247             throttle_time = self.next_request_time - time.time()
248             if throttle_time > 0:
249                 time.sleep(throttle_time)
250             self.next_request_time = time.time()
251             try:
252                 result = orig_func(self, *args, **kwargs)
253             except Exception as error:
254                 self.error_streak += 1
255                 self.next_request_time += min(2 ** self.error_streak,
256                                               self.max_retry_wait)
257                 raise
258             else:
259                 self.error_streak = 0
260                 return result
261         return throttle_wrapper
262
263     @_throttle_errors
264     def sync_node(self, cloud_node, arvados_node):
265         return self._cloud.sync_node(cloud_node, arvados_node)
266
267
268 class ComputeNodeMonitorActor(config.actor_class):
269     """Actor to manage a running compute node.
270
271     This actor gets updates about a compute node's cloud and Arvados records.
272     It uses this information to notify subscribers when the node is eligible
273     for shutdown.
274     """
275     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
276                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
277                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
278                  boot_fail_after=1800
279     ):
280         super(ComputeNodeMonitorActor, self).__init__()
281         self._later = self.actor_ref.proxy()
282         self._logger = logging.getLogger('arvnodeman.computenode')
283         self._last_log = None
284         self._shutdowns = shutdown_timer
285         self._cloud_node_fqdn = cloud_fqdn_func
286         self._timer = timer_actor
287         self._update = update_actor
288         self._cloud = cloud_client
289         self.cloud_node = cloud_node
290         self.cloud_node_start_time = cloud_node_start_time
291         self.poll_stale_after = poll_stale_after
292         self.node_stale_after = node_stale_after
293         self.boot_fail_after = boot_fail_after
294         self.subscribers = set()
295         self.arvados_node = None
296         self._later.update_arvados_node(arvados_node)
297         self.last_shutdown_opening = None
298         self._later.consider_shutdown()
299
300     def subscribe(self, subscriber):
301         self.subscribers.add(subscriber)
302
303     def _debug(self, msg, *args):
304         if msg == self._last_log:
305             return
306         self._last_log = msg
307         self._logger.debug(msg, *args)
308
309     def in_state(self, *states):
310         # Return a boolean to say whether or not our Arvados node record is in
311         # one of the given states.  If state information is not
312         # available--because this node has no Arvados record, the record is
313         # stale, or the record has no state information--return None.
314         if (self.arvados_node is None) or not timestamp_fresh(
315               arvados_node_mtime(self.arvados_node), self.node_stale_after):
316             return None
317         state = self.arvados_node['crunch_worker_state']
318         if not state:
319             return None
320         result = state in states
321         if state == 'idle':
322             result = result and not self.arvados_node['job_uuid']
323         return result
324
325     def shutdown_eligible(self):
326         if not self._shutdowns.window_open():
327             return False
328         if self.arvados_node is None:
329             # Node is unpaired.
330             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
331             return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
332         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
333         if missing and self._cloud.broken(self.cloud_node):
334             # Node is paired, but Arvados says it is missing and the cloud says the node
335             # is in an error state, so shut it down.
336             return True
337         if missing is None and self._cloud.broken(self.cloud_node):
338             self._logger.warning(
339                 "cloud reports broken node, but paired node %s never pinged "
340                 "(bug?) -- skipped check for node_stale_after",
341                 self.arvados_node['uuid'])
342         return self.in_state('idle')
343
344     def consider_shutdown(self):
345         next_opening = self._shutdowns.next_opening()
346         if self.shutdown_eligible():
347             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
348             _notify_subscribers(self._later, self.subscribers)
349         elif self._shutdowns.window_open():
350             self._debug("Node %s shutdown window open but node busy.",
351                         self.cloud_node.id)
352         elif self.last_shutdown_opening != next_opening:
353             self._debug("Node %s shutdown window closed.  Next at %s.",
354                         self.cloud_node.id, time.ctime(next_opening))
355             self._timer.schedule(next_opening, self._later.consider_shutdown)
356             self.last_shutdown_opening = next_opening
357
358     def offer_arvados_pair(self, arvados_node):
359         first_ping_s = arvados_node.get('first_ping_at')
360         if (self.arvados_node is not None) or (not first_ping_s):
361             return None
362         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
363               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
364             self._later.update_arvados_node(arvados_node)
365             return self.cloud_node.id
366         else:
367             return None
368
369     def update_cloud_node(self, cloud_node):
370         if cloud_node is not None:
371             self.cloud_node = cloud_node
372             self._later.consider_shutdown()
373
374     def update_arvados_node(self, arvados_node):
375         # If the cloud node's FQDN doesn't match what's in the Arvados node
376         # record, make them match.
377         # This method is a little unusual in the way it just fires off the
378         # request without checking the result or retrying errors.  That's
379         # because this update happens every time we reload the Arvados node
380         # list: if a previous sync attempt failed, we'll see that the names
381         # are out of sync and just try again.  ComputeNodeUpdateActor has
382         # the logic to throttle those effective retries when there's trouble.
383         if arvados_node is not None:
384             self.arvados_node = arvados_node
385             if (self._cloud_node_fqdn(self.cloud_node) !=
386                   arvados_node_fqdn(self.arvados_node)):
387                 self._update.sync_node(self.cloud_node, self.arvados_node)
388             self._later.consider_shutdown()