5714: Node Manager setup process retries Arvados errors.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._timer = timer_actor
30         self.min_retry_wait = retry_wait
31         self.max_retry_wait = max_retry_wait
32         self.retry_wait = retry_wait
33         self.subscribers = set()
34
35     @staticmethod
36     def _retry(errors=()):
37         """Retry decorator for an actor method that makes remote requests.
38
39         Use this function to decorator an actor method, and pass in a
40         tuple of exceptions to catch.  This decorator will schedule
41         retries of that method with exponential backoff if the
42         original method raises a known cloud driver error, or any of the
43         given exception types.
44         """
45         def decorator(orig_func):
46             @functools.wraps(orig_func)
47             def retry_wrapper(self, *args, **kwargs):
48                 start_time = time.time()
49                 try:
50                     orig_func(self, *args, **kwargs)
51                 except Exception as error:
52                     if not (isinstance(error, errors) or
53                             self._cloud.is_cloud_exception(error)):
54                         raise
55                     self._logger.warning(
56                         "Client error: %s - waiting %s seconds",
57                         error, self.retry_wait)
58                     self._timer.schedule(start_time + self.retry_wait,
59                                          getattr(self._later,
60                                                  orig_func.__name__),
61                                          *args, **kwargs)
62                     self.retry_wait = min(self.retry_wait * 2,
63                                           self.max_retry_wait)
64                 else:
65                     self.retry_wait = self.min_retry_wait
66             return retry_wrapper
67         return decorator
68
69     def _finished(self):
70         _notify_subscribers(self._later, self.subscribers)
71         self.subscribers = None
72
73     def subscribe(self, subscriber):
74         if self.subscribers is None:
75             try:
76                 subscriber(self._later)
77             except pykka.ActorDeadError:
78                 pass
79         else:
80             self.subscribers.add(subscriber)
81
82
83 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
84     """Actor to create and set up a cloud compute node.
85
86     This actor prepares an Arvados node record for a new compute node
87     (either creating one or cleaning one passed in), then boots the
88     actual compute node.  It notifies subscribers when the cloud node
89     is successfully created (the last step in the process for Node
90     Manager to handle).
91     """
92     def __init__(self, timer_actor, arvados_client, cloud_client,
93                  cloud_size, arvados_node=None,
94                  retry_wait=1, max_retry_wait=180):
95         super(ComputeNodeSetupActor, self).__init__(
96             'arvnodeman.nodeup', cloud_client, timer_actor,
97             retry_wait, max_retry_wait)
98         self._arvados = arvados_client
99         self.cloud_size = cloud_size
100         self.arvados_node = None
101         self.cloud_node = None
102         if arvados_node is None:
103             self._later.create_arvados_node()
104         else:
105             self._later.prepare_arvados_node(arvados_node)
106
107     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
108     def create_arvados_node(self):
109         self.arvados_node = self._arvados.nodes().create(body={}).execute()
110         self._later.create_cloud_node()
111
112     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
113     def prepare_arvados_node(self, node):
114         self.arvados_node = self._arvados.nodes().update(
115             uuid=node['uuid'],
116             body={'hostname': None,
117                   'ip_address': None,
118                   'slot_number': None,
119                   'first_ping_at': None,
120                   'last_ping_at': None,
121                   'info': {'ec2_instance_id': None,
122                            'last_action': "Prepared by Node Manager"}}
123             ).execute()
124         self._later.create_cloud_node()
125
126     @ComputeNodeStateChangeBase._retry()
127     def create_cloud_node(self):
128         self._logger.info("Creating cloud node with size %s.",
129                           self.cloud_size.name)
130         self.cloud_node = self._cloud.create_node(self.cloud_size,
131                                                   self.arvados_node)
132         self._logger.info("Cloud node %s created.", self.cloud_node.id)
133         self._later.post_create()
134
135     @ComputeNodeStateChangeBase._retry()
136     def post_create(self):
137         self._cloud.post_create_node(self.cloud_node)
138         self._logger.info("%s post-create work done.", self.cloud_node.id)
139         self._finished()
140
141     def stop_if_no_cloud_node(self):
142         if self.cloud_node is None:
143             self.stop()
144
145
146 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
147     """Actor to shut down a compute node.
148
149     This actor simply destroys a cloud node, retrying as needed.
150     """
151     def __init__(self, timer_actor, cloud_client, node_monitor,
152                  cancellable=True, retry_wait=1, max_retry_wait=180):
153         # If a ShutdownActor is cancellable, it will ask the
154         # ComputeNodeMonitorActor if it's still eligible before taking each
155         # action, and stop the shutdown process if the node is no longer
156         # eligible.  Normal shutdowns based on job demand should be
157         # cancellable; shutdowns based on node misbehavior should not.
158         super(ComputeNodeShutdownActor, self).__init__(
159             'arvnodeman.nodedown', cloud_client, timer_actor,
160             retry_wait, max_retry_wait)
161         self._monitor = node_monitor.proxy()
162         self.cloud_node = self._monitor.cloud_node.get()
163         self.cancellable = cancellable
164         self.success = None
165
166     def on_start(self):
167         self._later.shutdown_node()
168
169     def cancel_shutdown(self):
170         self.success = False
171         self._finished()
172
173     def _stop_if_window_closed(orig_func):
174         @functools.wraps(orig_func)
175         def stop_wrapper(self, *args, **kwargs):
176             if (self.cancellable and
177                   (not self._monitor.shutdown_eligible().get())):
178                 self._logger.info(
179                     "Cloud node %s shutdown cancelled - no longer eligible.",
180                     self.cloud_node.id)
181                 self._later.cancel_shutdown()
182                 return None
183             else:
184                 return orig_func(self, *args, **kwargs)
185         return stop_wrapper
186
187     @_stop_if_window_closed
188     @ComputeNodeStateChangeBase._retry()
189     def shutdown_node(self):
190         if self._cloud.destroy_node(self.cloud_node):
191             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
192             self.success = True
193             self._finished()
194         else:
195             # Force a retry.
196             raise cloud_types.LibcloudError("destroy_node failed")
197
198     # Make the decorator available to subclasses.
199     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
200
201
202 class ComputeNodeUpdateActor(config.actor_class):
203     """Actor to dispatch one-off cloud management requests.
204
205     This actor receives requests for small cloud updates, and
206     dispatches them to a real driver.  ComputeNodeMonitorActors use
207     this to perform maintenance tasks on themselves.  Having a
208     dedicated actor for this gives us the opportunity to control the
209     flow of requests; e.g., by backing off when errors occur.
210
211     This actor is most like a "traditional" Pykka actor: there's no
212     subscribing, but instead methods return real driver results.  If
213     you're interested in those results, you should get them from the
214     Future that the proxy method returns.  Be prepared to handle exceptions
215     from the cloud driver when you do.
216     """
217     def __init__(self, cloud_factory, max_retry_wait=180):
218         super(ComputeNodeUpdateActor, self).__init__()
219         self._cloud = cloud_factory()
220         self.max_retry_wait = max_retry_wait
221         self.error_streak = 0
222         self.next_request_time = time.time()
223
224     def _throttle_errors(orig_func):
225         @functools.wraps(orig_func)
226         def throttle_wrapper(self, *args, **kwargs):
227             throttle_time = self.next_request_time - time.time()
228             if throttle_time > 0:
229                 time.sleep(throttle_time)
230             self.next_request_time = time.time()
231             try:
232                 result = orig_func(self, *args, **kwargs)
233             except Exception as error:
234                 self.error_streak += 1
235                 self.next_request_time += min(2 ** self.error_streak,
236                                               self.max_retry_wait)
237                 raise
238             else:
239                 self.error_streak = 0
240                 return result
241         return throttle_wrapper
242
243     @_throttle_errors
244     def sync_node(self, cloud_node, arvados_node):
245         return self._cloud.sync_node(cloud_node, arvados_node)
246
247
248 class ComputeNodeMonitorActor(config.actor_class):
249     """Actor to manage a running compute node.
250
251     This actor gets updates about a compute node's cloud and Arvados records.
252     It uses this information to notify subscribers when the node is eligible
253     for shutdown.
254     """
255     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
256                  cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
257                  poll_stale_after=600, node_stale_after=3600):
258         super(ComputeNodeMonitorActor, self).__init__()
259         self._later = self.actor_ref.proxy()
260         self._logger = logging.getLogger('arvnodeman.computenode')
261         self._last_log = None
262         self._shutdowns = shutdown_timer
263         self._cloud_node_fqdn = cloud_fqdn_func
264         self._timer = timer_actor
265         self._update = update_actor
266         self.cloud_node = cloud_node
267         self.cloud_node_start_time = cloud_node_start_time
268         self.poll_stale_after = poll_stale_after
269         self.node_stale_after = node_stale_after
270         self.subscribers = set()
271         self.arvados_node = None
272         self._later.update_arvados_node(arvados_node)
273         self.last_shutdown_opening = None
274         self._later.consider_shutdown()
275
276     def subscribe(self, subscriber):
277         self.subscribers.add(subscriber)
278
279     def _debug(self, msg, *args):
280         if msg == self._last_log:
281             return
282         self._last_log = msg
283         self._logger.debug(msg, *args)
284
285     def in_state(self, *states):
286         # Return a boolean to say whether or not our Arvados node record is in
287         # one of the given states.  If state information is not
288         # available--because this node has no Arvados record, the record is
289         # stale, or the record has no state information--return None.
290         if (self.arvados_node is None) or not timestamp_fresh(
291               arvados_node_mtime(self.arvados_node), self.node_stale_after):
292             return None
293         state = self.arvados_node['crunch_worker_state']
294         if not state:
295             return None
296         result = state in states
297         if state == 'idle':
298             result = result and not self.arvados_node['job_uuid']
299         return result
300
301     def shutdown_eligible(self):
302         if not self._shutdowns.window_open():
303             return False
304         elif self.arvados_node is None:
305             # If this is a new, unpaired node, it's eligible for
306             # shutdown--we figure there was an error during bootstrap.
307             return timestamp_fresh(self.cloud_node_start_time,
308                                    self.node_stale_after)
309         else:
310             return self.in_state('idle')
311
312     def consider_shutdown(self):
313         next_opening = self._shutdowns.next_opening()
314         if self.shutdown_eligible():
315             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
316             _notify_subscribers(self._later, self.subscribers)
317         elif self._shutdowns.window_open():
318             self._debug("Node %s shutdown window open but node busy.",
319                         self.cloud_node.id)
320         elif self.last_shutdown_opening != next_opening:
321             self._debug("Node %s shutdown window closed.  Next at %s.",
322                         self.cloud_node.id, time.ctime(next_opening))
323             self._timer.schedule(next_opening, self._later.consider_shutdown)
324             self.last_shutdown_opening = next_opening
325
326     def offer_arvados_pair(self, arvados_node):
327         first_ping_s = arvados_node.get('first_ping_at')
328         if (self.arvados_node is not None) or (not first_ping_s):
329             return None
330         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
331               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
332             self._later.update_arvados_node(arvados_node)
333             return self.cloud_node.id
334         else:
335             return None
336
337     def update_cloud_node(self, cloud_node):
338         if cloud_node is not None:
339             self.cloud_node = cloud_node
340             self._later.consider_shutdown()
341
342     def update_arvados_node(self, arvados_node):
343         # If the cloud node's FQDN doesn't match what's in the Arvados node
344         # record, make them match.
345         # This method is a little unusual in the way it just fires off the
346         # request without checking the result or retrying errors.  That's
347         # because this update happens every time we reload the Arvados node
348         # list: if a previous sync attempt failed, we'll see that the names
349         # are out of sync and just try again.  ComputeNodeUpdateActor has
350         # the logic to throttle those effective retries when there's trouble.
351         if arvados_node is not None:
352             self.arvados_node = arvados_node
353             if (self._cloud_node_fqdn(self.cloud_node) !=
354                   arvados_node_fqdn(self.arvados_node)):
355                 self._update.sync_node(self.cloud_node, self.arvados_node)
356             self._later.consider_shutdown()