5714: Avoid Node Manager race conditions around stop_if_no_cloud_node.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._timer = timer_actor
30         self.min_retry_wait = retry_wait
31         self.max_retry_wait = max_retry_wait
32         self.retry_wait = retry_wait
33         self.subscribers = set()
34
35     @staticmethod
36     def _retry(errors=()):
37         """Retry decorator for an actor method that makes remote requests.
38
39         Use this function to decorator an actor method, and pass in a
40         tuple of exceptions to catch.  This decorator will schedule
41         retries of that method with exponential backoff if the
42         original method raises a known cloud driver error, or any of the
43         given exception types.
44         """
45         def decorator(orig_func):
46             @functools.wraps(orig_func)
47             def retry_wrapper(self, *args, **kwargs):
48                 start_time = time.time()
49                 try:
50                     orig_func(self, *args, **kwargs)
51                 except Exception as error:
52                     if not (isinstance(error, errors) or
53                             self._cloud.is_cloud_exception(error)):
54                         raise
55                     self._logger.warning(
56                         "Client error: %s - waiting %s seconds",
57                         error, self.retry_wait)
58                     self._timer.schedule(start_time + self.retry_wait,
59                                          getattr(self._later,
60                                                  orig_func.__name__),
61                                          *args, **kwargs)
62                     self.retry_wait = min(self.retry_wait * 2,
63                                           self.max_retry_wait)
64                 else:
65                     self.retry_wait = self.min_retry_wait
66             return retry_wrapper
67         return decorator
68
69     def _finished(self):
70         _notify_subscribers(self._later, self.subscribers)
71         self.subscribers = None
72
73     def subscribe(self, subscriber):
74         if self.subscribers is None:
75             try:
76                 subscriber(self._later)
77             except pykka.ActorDeadError:
78                 pass
79         else:
80             self.subscribers.add(subscriber)
81
82
83 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
84     """Actor to create and set up a cloud compute node.
85
86     This actor prepares an Arvados node record for a new compute node
87     (either creating one or cleaning one passed in), then boots the
88     actual compute node.  It notifies subscribers when the cloud node
89     is successfully created (the last step in the process for Node
90     Manager to handle).
91     """
92     def __init__(self, timer_actor, arvados_client, cloud_client,
93                  cloud_size, arvados_node=None,
94                  retry_wait=1, max_retry_wait=180):
95         super(ComputeNodeSetupActor, self).__init__(
96             'arvnodeman.nodeup', cloud_client, timer_actor,
97             retry_wait, max_retry_wait)
98         self._arvados = arvados_client
99         self.cloud_size = cloud_size
100         self.arvados_node = None
101         self.cloud_node = None
102         if arvados_node is None:
103             self._later.create_arvados_node()
104         else:
105             self._later.prepare_arvados_node(arvados_node)
106
107     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
108     def create_arvados_node(self):
109         self.arvados_node = self._arvados.nodes().create(body={}).execute()
110         self._later.create_cloud_node()
111
112     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
113     def prepare_arvados_node(self, node):
114         self.arvados_node = self._arvados.nodes().update(
115             uuid=node['uuid'],
116             body={'hostname': None,
117                   'ip_address': None,
118                   'slot_number': None,
119                   'first_ping_at': None,
120                   'last_ping_at': None,
121                   'info': {'ec2_instance_id': None,
122                            'last_action': "Prepared by Node Manager"}}
123             ).execute()
124         self._later.create_cloud_node()
125
126     @ComputeNodeStateChangeBase._retry()
127     def create_cloud_node(self):
128         self._logger.info("Creating cloud node with size %s.",
129                           self.cloud_size.name)
130         self.cloud_node = self._cloud.create_node(self.cloud_size,
131                                                   self.arvados_node)
132         self._logger.info("Cloud node %s created.", self.cloud_node.id)
133         self._later.post_create()
134
135     @ComputeNodeStateChangeBase._retry()
136     def post_create(self):
137         self._cloud.post_create_node(self.cloud_node)
138         self._logger.info("%s post-create work done.", self.cloud_node.id)
139         self._finished()
140
141     def stop_if_no_cloud_node(self):
142         if self.cloud_node is not None:
143             return False
144         self.stop()
145         return True
146
147
148 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
149     """Actor to shut down a compute node.
150
151     This actor simply destroys a cloud node, retrying as needed.
152     """
153     def __init__(self, timer_actor, cloud_client, node_monitor,
154                  cancellable=True, retry_wait=1, max_retry_wait=180):
155         # If a ShutdownActor is cancellable, it will ask the
156         # ComputeNodeMonitorActor if it's still eligible before taking each
157         # action, and stop the shutdown process if the node is no longer
158         # eligible.  Normal shutdowns based on job demand should be
159         # cancellable; shutdowns based on node misbehavior should not.
160         super(ComputeNodeShutdownActor, self).__init__(
161             'arvnodeman.nodedown', cloud_client, timer_actor,
162             retry_wait, max_retry_wait)
163         self._monitor = node_monitor.proxy()
164         self.cloud_node = self._monitor.cloud_node.get()
165         self.cancellable = cancellable
166         self.success = None
167
168     def on_start(self):
169         self._later.shutdown_node()
170
171     def cancel_shutdown(self):
172         self.success = False
173         self._finished()
174
175     def _stop_if_window_closed(orig_func):
176         @functools.wraps(orig_func)
177         def stop_wrapper(self, *args, **kwargs):
178             if (self.cancellable and
179                   (not self._monitor.shutdown_eligible().get())):
180                 self._logger.info(
181                     "Cloud node %s shutdown cancelled - no longer eligible.",
182                     self.cloud_node.id)
183                 self._later.cancel_shutdown()
184                 return None
185             else:
186                 return orig_func(self, *args, **kwargs)
187         return stop_wrapper
188
189     @_stop_if_window_closed
190     @ComputeNodeStateChangeBase._retry()
191     def shutdown_node(self):
192         if self._cloud.destroy_node(self.cloud_node):
193             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
194             self.success = True
195             self._finished()
196         else:
197             # Force a retry.
198             raise cloud_types.LibcloudError("destroy_node failed")
199
200     # Make the decorator available to subclasses.
201     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
202
203
204 class ComputeNodeUpdateActor(config.actor_class):
205     """Actor to dispatch one-off cloud management requests.
206
207     This actor receives requests for small cloud updates, and
208     dispatches them to a real driver.  ComputeNodeMonitorActors use
209     this to perform maintenance tasks on themselves.  Having a
210     dedicated actor for this gives us the opportunity to control the
211     flow of requests; e.g., by backing off when errors occur.
212
213     This actor is most like a "traditional" Pykka actor: there's no
214     subscribing, but instead methods return real driver results.  If
215     you're interested in those results, you should get them from the
216     Future that the proxy method returns.  Be prepared to handle exceptions
217     from the cloud driver when you do.
218     """
219     def __init__(self, cloud_factory, max_retry_wait=180):
220         super(ComputeNodeUpdateActor, self).__init__()
221         self._cloud = cloud_factory()
222         self.max_retry_wait = max_retry_wait
223         self.error_streak = 0
224         self.next_request_time = time.time()
225
226     def _throttle_errors(orig_func):
227         @functools.wraps(orig_func)
228         def throttle_wrapper(self, *args, **kwargs):
229             throttle_time = self.next_request_time - time.time()
230             if throttle_time > 0:
231                 time.sleep(throttle_time)
232             self.next_request_time = time.time()
233             try:
234                 result = orig_func(self, *args, **kwargs)
235             except Exception as error:
236                 self.error_streak += 1
237                 self.next_request_time += min(2 ** self.error_streak,
238                                               self.max_retry_wait)
239                 raise
240             else:
241                 self.error_streak = 0
242                 return result
243         return throttle_wrapper
244
245     @_throttle_errors
246     def sync_node(self, cloud_node, arvados_node):
247         return self._cloud.sync_node(cloud_node, arvados_node)
248
249
250 class ComputeNodeMonitorActor(config.actor_class):
251     """Actor to manage a running compute node.
252
253     This actor gets updates about a compute node's cloud and Arvados records.
254     It uses this information to notify subscribers when the node is eligible
255     for shutdown.
256     """
257     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
258                  cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
259                  poll_stale_after=600, node_stale_after=3600):
260         super(ComputeNodeMonitorActor, self).__init__()
261         self._later = self.actor_ref.proxy()
262         self._logger = logging.getLogger('arvnodeman.computenode')
263         self._last_log = None
264         self._shutdowns = shutdown_timer
265         self._cloud_node_fqdn = cloud_fqdn_func
266         self._timer = timer_actor
267         self._update = update_actor
268         self.cloud_node = cloud_node
269         self.cloud_node_start_time = cloud_node_start_time
270         self.poll_stale_after = poll_stale_after
271         self.node_stale_after = node_stale_after
272         self.subscribers = set()
273         self.arvados_node = None
274         self._later.update_arvados_node(arvados_node)
275         self.last_shutdown_opening = None
276         self._later.consider_shutdown()
277
278     def subscribe(self, subscriber):
279         self.subscribers.add(subscriber)
280
281     def _debug(self, msg, *args):
282         if msg == self._last_log:
283             return
284         self._last_log = msg
285         self._logger.debug(msg, *args)
286
287     def in_state(self, *states):
288         # Return a boolean to say whether or not our Arvados node record is in
289         # one of the given states.  If state information is not
290         # available--because this node has no Arvados record, the record is
291         # stale, or the record has no state information--return None.
292         if (self.arvados_node is None) or not timestamp_fresh(
293               arvados_node_mtime(self.arvados_node), self.node_stale_after):
294             return None
295         state = self.arvados_node['crunch_worker_state']
296         if not state:
297             return None
298         result = state in states
299         if state == 'idle':
300             result = result and not self.arvados_node['job_uuid']
301         return result
302
303     def shutdown_eligible(self):
304         if not self._shutdowns.window_open():
305             return False
306         elif self.arvados_node is None:
307             # If this is a new, unpaired node, it's eligible for
308             # shutdown--we figure there was an error during bootstrap.
309             return timestamp_fresh(self.cloud_node_start_time,
310                                    self.node_stale_after)
311         else:
312             return self.in_state('idle')
313
314     def consider_shutdown(self):
315         next_opening = self._shutdowns.next_opening()
316         if self.shutdown_eligible():
317             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
318             _notify_subscribers(self._later, self.subscribers)
319         elif self._shutdowns.window_open():
320             self._debug("Node %s shutdown window open but node busy.",
321                         self.cloud_node.id)
322         elif self.last_shutdown_opening != next_opening:
323             self._debug("Node %s shutdown window closed.  Next at %s.",
324                         self.cloud_node.id, time.ctime(next_opening))
325             self._timer.schedule(next_opening, self._later.consider_shutdown)
326             self.last_shutdown_opening = next_opening
327
328     def offer_arvados_pair(self, arvados_node):
329         first_ping_s = arvados_node.get('first_ping_at')
330         if (self.arvados_node is not None) or (not first_ping_s):
331             return None
332         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
333               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
334             self._later.update_arvados_node(arvados_node)
335             return self.cloud_node.id
336         else:
337             return None
338
339     def update_cloud_node(self, cloud_node):
340         if cloud_node is not None:
341             self.cloud_node = cloud_node
342             self._later.consider_shutdown()
343
344     def update_arvados_node(self, arvados_node):
345         # If the cloud node's FQDN doesn't match what's in the Arvados node
346         # record, make them match.
347         # This method is a little unusual in the way it just fires off the
348         # request without checking the result or retrying errors.  That's
349         # because this update happens every time we reload the Arvados node
350         # list: if a previous sync attempt failed, we'll see that the names
351         # are out of sync and just try again.  ComputeNodeUpdateActor has
352         # the logic to throttle those effective retries when there's trouble.
353         if arvados_node is not None:
354             self.arvados_node = arvados_node
355             if (self._cloud_node_fqdn(self.cloud_node) !=
356                   arvados_node_fqdn(self.arvados_node)):
357                 self._update.sync_node(self.cloud_node, self.arvados_node)
358             self._later.consider_shutdown()