Merge branch 'master' into 5538-arvadosclient-retry
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._arvados = arvados_client
30         self._timer = timer_actor
31         self.min_retry_wait = retry_wait
32         self.max_retry_wait = max_retry_wait
33         self.retry_wait = retry_wait
34         self.subscribers = set()
35
36     @staticmethod
37     def _retry(errors=()):
38         """Retry decorator for an actor method that makes remote requests.
39
40         Use this function to decorator an actor method, and pass in a
41         tuple of exceptions to catch.  This decorator will schedule
42         retries of that method with exponential backoff if the
43         original method raises a known cloud driver error, or any of the
44         given exception types.
45         """
46         def decorator(orig_func):
47             @functools.wraps(orig_func)
48             def retry_wrapper(self, *args, **kwargs):
49                 start_time = time.time()
50                 try:
51                     orig_func(self, *args, **kwargs)
52                 except Exception as error:
53                     if not (isinstance(error, errors) or
54                             self._cloud.is_cloud_exception(error)):
55                         raise
56                     self._logger.warning(
57                         "Client error: %s - waiting %s seconds",
58                         error, self.retry_wait)
59                     self._timer.schedule(start_time + self.retry_wait,
60                                          getattr(self._later,
61                                                  orig_func.__name__),
62                                          *args, **kwargs)
63                     self.retry_wait = min(self.retry_wait * 2,
64                                           self.max_retry_wait)
65                 else:
66                     self.retry_wait = self.min_retry_wait
67             return retry_wrapper
68         return decorator
69
70     def _finished(self):
71         _notify_subscribers(self._later, self.subscribers)
72         self.subscribers = None
73
74     def subscribe(self, subscriber):
75         if self.subscribers is None:
76             try:
77                 subscriber(self._later)
78             except pykka.ActorDeadError:
79                 pass
80         else:
81             self.subscribers.add(subscriber)
82
83     def _clean_arvados_node(self, arvados_node, explanation):
84         return self._arvados.nodes().update(
85             uuid=arvados_node['uuid'],
86             body={'hostname': None,
87                   'ip_address': None,
88                   'slot_number': None,
89                   'first_ping_at': None,
90                   'last_ping_at': None,
91                   'info': {'ec2_instance_id': None,
92                            'last_action': explanation}},
93             ).execute()
94
95
96 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
97     """Actor to create and set up a cloud compute node.
98
99     This actor prepares an Arvados node record for a new compute node
100     (either creating one or cleaning one passed in), then boots the
101     actual compute node.  It notifies subscribers when the cloud node
102     is successfully created (the last step in the process for Node
103     Manager to handle).
104     """
105     def __init__(self, timer_actor, arvados_client, cloud_client,
106                  cloud_size, arvados_node=None,
107                  retry_wait=1, max_retry_wait=180):
108         super(ComputeNodeSetupActor, self).__init__(
109             'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
110             retry_wait, max_retry_wait)
111         self.cloud_size = cloud_size
112         self.arvados_node = None
113         self.cloud_node = None
114         if arvados_node is None:
115             self._later.create_arvados_node()
116         else:
117             self._later.prepare_arvados_node(arvados_node)
118
119     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
120     def create_arvados_node(self):
121         self.arvados_node = self._arvados.nodes().create(body={}).execute()
122         self._later.create_cloud_node()
123
124     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
125     def prepare_arvados_node(self, node):
126         self.arvados_node = self._clean_arvados_node(
127             node, "Prepared by Node Manager")
128         self._later.create_cloud_node()
129
130     @ComputeNodeStateChangeBase._retry()
131     def create_cloud_node(self):
132         self._logger.info("Creating cloud node with size %s.",
133                           self.cloud_size.name)
134         self.cloud_node = self._cloud.create_node(self.cloud_size,
135                                                   self.arvados_node)
136         self._logger.info("Cloud node %s created.", self.cloud_node.id)
137         self._later.post_create()
138
139     @ComputeNodeStateChangeBase._retry()
140     def post_create(self):
141         self._cloud.post_create_node(self.cloud_node)
142         self._logger.info("%s post-create work done.", self.cloud_node.id)
143         self._finished()
144
145     def stop_if_no_cloud_node(self):
146         if self.cloud_node is not None:
147             return False
148         self.stop()
149         return True
150
151
152 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
153     """Actor to shut down a compute node.
154
155     This actor simply destroys a cloud node, retrying as needed.
156     """
157     # Reasons for a shutdown to be cancelled.
158     WINDOW_CLOSED = "shutdown window closed"
159     NODE_BROKEN = "cloud failed to shut down broken node"
160
161     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
162                  cancellable=True, retry_wait=1, max_retry_wait=180):
163         # If a ShutdownActor is cancellable, it will ask the
164         # ComputeNodeMonitorActor if it's still eligible before taking each
165         # action, and stop the shutdown process if the node is no longer
166         # eligible.  Normal shutdowns based on job demand should be
167         # cancellable; shutdowns based on node misbehavior should not.
168         super(ComputeNodeShutdownActor, self).__init__(
169             'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
170             retry_wait, max_retry_wait)
171         self._monitor = node_monitor.proxy()
172         self.cloud_node = self._monitor.cloud_node.get()
173         self.cancellable = cancellable
174         self.cancel_reason = None
175         self.success = None
176
177     def on_start(self):
178         self._later.shutdown_node()
179
180     def _arvados_node(self):
181         return self._monitor.arvados_node.get()
182
183     def _finished(self, success_flag=None):
184         if success_flag is not None:
185             self.success = success_flag
186         return super(ComputeNodeShutdownActor, self)._finished()
187
188     def cancel_shutdown(self, reason):
189         self.cancel_reason = reason
190         self._logger.info("Cloud node %s shutdown cancelled: %s.",
191                           self.cloud_node.id, reason)
192         self._finished(success_flag=False)
193
194     def _stop_if_window_closed(orig_func):
195         @functools.wraps(orig_func)
196         def stop_wrapper(self, *args, **kwargs):
197             if (self.cancellable and
198                   (not self._monitor.shutdown_eligible().get())):
199                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
200                 return None
201             else:
202                 return orig_func(self, *args, **kwargs)
203         return stop_wrapper
204
205     @_stop_if_window_closed
206     @ComputeNodeStateChangeBase._retry()
207     def shutdown_node(self):
208         if not self._cloud.destroy_node(self.cloud_node):
209             if self._cloud.broken(self.cloud_node):
210                 self._later.cancel_shutdown(self.NODE_BROKEN)
211             else:
212                 # Force a retry.
213                 raise cloud_types.LibcloudError("destroy_node failed")
214         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
215         arv_node = self._arvados_node()
216         if arv_node is None:
217             self._finished(success_flag=True)
218         else:
219             self._later.clean_arvados_node(arv_node)
220
221     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
222     def clean_arvados_node(self, arvados_node):
223         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
224         self._finished(success_flag=True)
225
226     # Make the decorator available to subclasses.
227     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
228
229
230 class ComputeNodeUpdateActor(config.actor_class):
231     """Actor to dispatch one-off cloud management requests.
232
233     This actor receives requests for small cloud updates, and
234     dispatches them to a real driver.  ComputeNodeMonitorActors use
235     this to perform maintenance tasks on themselves.  Having a
236     dedicated actor for this gives us the opportunity to control the
237     flow of requests; e.g., by backing off when errors occur.
238
239     This actor is most like a "traditional" Pykka actor: there's no
240     subscribing, but instead methods return real driver results.  If
241     you're interested in those results, you should get them from the
242     Future that the proxy method returns.  Be prepared to handle exceptions
243     from the cloud driver when you do.
244     """
245     def __init__(self, cloud_factory, max_retry_wait=180):
246         super(ComputeNodeUpdateActor, self).__init__()
247         self._cloud = cloud_factory()
248         self.max_retry_wait = max_retry_wait
249         self.error_streak = 0
250         self.next_request_time = time.time()
251
252     def _throttle_errors(orig_func):
253         @functools.wraps(orig_func)
254         def throttle_wrapper(self, *args, **kwargs):
255             throttle_time = self.next_request_time - time.time()
256             if throttle_time > 0:
257                 time.sleep(throttle_time)
258             self.next_request_time = time.time()
259             try:
260                 result = orig_func(self, *args, **kwargs)
261             except Exception as error:
262                 self.error_streak += 1
263                 self.next_request_time += min(2 ** self.error_streak,
264                                               self.max_retry_wait)
265                 raise
266             else:
267                 self.error_streak = 0
268                 return result
269         return throttle_wrapper
270
271     @_throttle_errors
272     def sync_node(self, cloud_node, arvados_node):
273         return self._cloud.sync_node(cloud_node, arvados_node)
274
275
276 class ComputeNodeMonitorActor(config.actor_class):
277     """Actor to manage a running compute node.
278
279     This actor gets updates about a compute node's cloud and Arvados records.
280     It uses this information to notify subscribers when the node is eligible
281     for shutdown.
282     """
283     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
284                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
285                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
286                  boot_fail_after=1800
287     ):
288         super(ComputeNodeMonitorActor, self).__init__()
289         self._later = self.actor_ref.proxy()
290         self._logger = logging.getLogger('arvnodeman.computenode')
291         self._last_log = None
292         self._shutdowns = shutdown_timer
293         self._cloud_node_fqdn = cloud_fqdn_func
294         self._timer = timer_actor
295         self._update = update_actor
296         self._cloud = cloud_client
297         self.cloud_node = cloud_node
298         self.cloud_node_start_time = cloud_node_start_time
299         self.poll_stale_after = poll_stale_after
300         self.node_stale_after = node_stale_after
301         self.boot_fail_after = boot_fail_after
302         self.subscribers = set()
303         self.arvados_node = None
304         self._later.update_arvados_node(arvados_node)
305         self.last_shutdown_opening = None
306         self._later.consider_shutdown()
307
308     def subscribe(self, subscriber):
309         self.subscribers.add(subscriber)
310
311     def _debug(self, msg, *args):
312         if msg == self._last_log:
313             return
314         self._last_log = msg
315         self._logger.debug(msg, *args)
316
317     def in_state(self, *states):
318         # Return a boolean to say whether or not our Arvados node record is in
319         # one of the given states.  If state information is not
320         # available--because this node has no Arvados record, the record is
321         # stale, or the record has no state information--return None.
322         if (self.arvados_node is None) or not timestamp_fresh(
323               arvados_node_mtime(self.arvados_node), self.node_stale_after):
324             return None
325         state = self.arvados_node['crunch_worker_state']
326         if not state:
327             return None
328         result = state in states
329         if state == 'idle':
330             result = result and not self.arvados_node['job_uuid']
331         return result
332
333     def shutdown_eligible(self):
334         if not self._shutdowns.window_open():
335             return False
336         if self.arvados_node is None:
337             # Node is unpaired.
338             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
339             return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
340         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
341         if missing and self._cloud.broken(self.cloud_node):
342             # Node is paired, but Arvados says it is missing and the cloud says the node
343             # is in an error state, so shut it down.
344             return True
345         if missing is None and self._cloud.broken(self.cloud_node):
346             self._logger.warning(
347                 "cloud reports broken node, but paired node %s never pinged "
348                 "(bug?) -- skipped check for node_stale_after",
349                 self.arvados_node['uuid'])
350         return self.in_state('idle')
351
352     def consider_shutdown(self):
353         next_opening = self._shutdowns.next_opening()
354         if self.shutdown_eligible():
355             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
356             _notify_subscribers(self._later, self.subscribers)
357         elif self._shutdowns.window_open():
358             self._debug("Node %s shutdown window open but node busy.",
359                         self.cloud_node.id)
360         elif self.last_shutdown_opening != next_opening:
361             self._debug("Node %s shutdown window closed.  Next at %s.",
362                         self.cloud_node.id, time.ctime(next_opening))
363             self._timer.schedule(next_opening, self._later.consider_shutdown)
364             self.last_shutdown_opening = next_opening
365
366     def offer_arvados_pair(self, arvados_node):
367         first_ping_s = arvados_node.get('first_ping_at')
368         if (self.arvados_node is not None) or (not first_ping_s):
369             return None
370         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
371               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
372             self._later.update_arvados_node(arvados_node)
373             return self.cloud_node.id
374         else:
375             return None
376
377     def update_cloud_node(self, cloud_node):
378         if cloud_node is not None:
379             self.cloud_node = cloud_node
380             self._later.consider_shutdown()
381
382     def update_arvados_node(self, arvados_node):
383         # If the cloud node's FQDN doesn't match what's in the Arvados node
384         # record, make them match.
385         # This method is a little unusual in the way it just fires off the
386         # request without checking the result or retrying errors.  That's
387         # because this update happens every time we reload the Arvados node
388         # list: if a previous sync attempt failed, we'll see that the names
389         # are out of sync and just try again.  ComputeNodeUpdateActor has
390         # the logic to throttle those effective retries when there's trouble.
391         if arvados_node is not None:
392             self.arvados_node = arvados_node
393             if (self._cloud_node_fqdn(self.cloud_node) !=
394                   arvados_node_fqdn(self.arvados_node)):
395                 self._update.sync_node(self.cloud_node, self.arvados_node)
396             self._later.consider_shutdown()