Merge branch '7832-pysdk-use-slots' refs #7832
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._arvados = arvados_client
30         self._timer = timer_actor
31         self.min_retry_wait = retry_wait
32         self.max_retry_wait = max_retry_wait
33         self.retry_wait = retry_wait
34         self.subscribers = set()
35
36     @staticmethod
37     def _retry(errors=()):
38         """Retry decorator for an actor method that makes remote requests.
39
40         Use this function to decorator an actor method, and pass in a
41         tuple of exceptions to catch.  This decorator will schedule
42         retries of that method with exponential backoff if the
43         original method raises a known cloud driver error, or any of the
44         given exception types.
45         """
46         def decorator(orig_func):
47             @functools.wraps(orig_func)
48             def retry_wrapper(self, *args, **kwargs):
49                 start_time = time.time()
50                 try:
51                     orig_func(self, *args, **kwargs)
52                 except Exception as error:
53                     if not (isinstance(error, errors) or
54                             self._cloud.is_cloud_exception(error)):
55                         raise
56                     self._logger.warning(
57                         "Client error: %s - waiting %s seconds",
58                         error, self.retry_wait)
59                     self._timer.schedule(start_time + self.retry_wait,
60                                          getattr(self._later,
61                                                  orig_func.__name__),
62                                          *args, **kwargs)
63                     self.retry_wait = min(self.retry_wait * 2,
64                                           self.max_retry_wait)
65                 else:
66                     self.retry_wait = self.min_retry_wait
67             return retry_wrapper
68         return decorator
69
70     def _finished(self):
71         _notify_subscribers(self._later, self.subscribers)
72         self.subscribers = None
73
74     def subscribe(self, subscriber):
75         if self.subscribers is None:
76             try:
77                 subscriber(self._later)
78             except pykka.ActorDeadError:
79                 pass
80         else:
81             self.subscribers.add(subscriber)
82
83     def _clean_arvados_node(self, arvados_node, explanation):
84         return self._arvados.nodes().update(
85             uuid=arvados_node['uuid'],
86             body={'hostname': None,
87                   'ip_address': None,
88                   'slot_number': None,
89                   'first_ping_at': None,
90                   'last_ping_at': None,
91                   'properties': {},
92                   'info': {'ec2_instance_id': None,
93                            'last_action': explanation}},
94             ).execute()
95
96
97 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
98     """Actor to create and set up a cloud compute node.
99
100     This actor prepares an Arvados node record for a new compute node
101     (either creating one or cleaning one passed in), then boots the
102     actual compute node.  It notifies subscribers when the cloud node
103     is successfully created (the last step in the process for Node
104     Manager to handle).
105     """
106     def __init__(self, timer_actor, arvados_client, cloud_client,
107                  cloud_size, arvados_node=None,
108                  retry_wait=1, max_retry_wait=180):
109         super(ComputeNodeSetupActor, self).__init__(
110             'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
111             retry_wait, max_retry_wait)
112         self.cloud_size = cloud_size
113         self.arvados_node = None
114         self.cloud_node = None
115         if arvados_node is None:
116             self._later.create_arvados_node()
117         else:
118             self._later.prepare_arvados_node(arvados_node)
119
120     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
121     def create_arvados_node(self):
122         self.arvados_node = self._arvados.nodes().create(body={}).execute()
123         self._later.create_cloud_node()
124
125     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
126     def prepare_arvados_node(self, node):
127         self.arvados_node = self._clean_arvados_node(
128             node, "Prepared by Node Manager")
129         self._later.create_cloud_node()
130
131     @ComputeNodeStateChangeBase._retry()
132     def create_cloud_node(self):
133         self._logger.info("Creating cloud node with size %s.",
134                           self.cloud_size.name)
135         self.cloud_node = self._cloud.create_node(self.cloud_size,
136                                                   self.arvados_node)
137         if not self.cloud_node.size:
138              self.cloud_node.size = self.cloud_size
139         self._logger.info("Cloud node %s created.", self.cloud_node.id)
140         self._later.update_arvados_node_properties()
141
142     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
143     def update_arvados_node_properties(self):
144         """Tell Arvados some details about the cloud node.
145
146         Currently we only include size/price from our request, which
147         we already knew before create_cloud_node(), but doing it here
148         gives us an opportunity to provide more detail from
149         self.cloud_node, too.
150         """
151         self.arvados_node['properties']['cloud_node'] = {
152             # Note this 'size' is the node size we asked the cloud
153             # driver to create -- not necessarily equal to the size
154             # reported by the cloud driver for the node that was
155             # created.
156             'size': self.cloud_size.id,
157             'price': self.cloud_size.price,
158         }
159         self.arvados_node = self._arvados.nodes().update(
160             uuid=self.arvados_node['uuid'],
161             body={'properties': self.arvados_node['properties']},
162         ).execute()
163         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
164         self._later.post_create()
165
166     @ComputeNodeStateChangeBase._retry()
167     def post_create(self):
168         self._cloud.post_create_node(self.cloud_node)
169         self._logger.info("%s post-create work done.", self.cloud_node.id)
170         self._finished()
171
172     def stop_if_no_cloud_node(self):
173         if self.cloud_node is not None:
174             return False
175         self.stop()
176         return True
177
178
179 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
180     """Actor to shut down a compute node.
181
182     This actor simply destroys a cloud node, retrying as needed.
183     """
184     # Reasons for a shutdown to be cancelled.
185     WINDOW_CLOSED = "shutdown window closed"
186     NODE_BROKEN = "cloud failed to shut down broken node"
187
188     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
189                  cancellable=True, retry_wait=1, max_retry_wait=180):
190         # If a ShutdownActor is cancellable, it will ask the
191         # ComputeNodeMonitorActor if it's still eligible before taking each
192         # action, and stop the shutdown process if the node is no longer
193         # eligible.  Normal shutdowns based on job demand should be
194         # cancellable; shutdowns based on node misbehavior should not.
195         super(ComputeNodeShutdownActor, self).__init__(
196             'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
197             retry_wait, max_retry_wait)
198         self._monitor = node_monitor.proxy()
199         self.cloud_node = self._monitor.cloud_node.get()
200         self.cancellable = cancellable
201         self.cancel_reason = None
202         self.success = None
203
204     def on_start(self):
205         self._later.shutdown_node()
206
207     def _arvados_node(self):
208         return self._monitor.arvados_node.get()
209
210     def _finished(self, success_flag=None):
211         if success_flag is not None:
212             self.success = success_flag
213         return super(ComputeNodeShutdownActor, self)._finished()
214
215     def cancel_shutdown(self, reason):
216         self.cancel_reason = reason
217         self._logger.info("Cloud node %s shutdown cancelled: %s.",
218                           self.cloud_node.id, reason)
219         self._finished(success_flag=False)
220
221     def _stop_if_window_closed(orig_func):
222         @functools.wraps(orig_func)
223         def stop_wrapper(self, *args, **kwargs):
224             if (self.cancellable and
225                   (not self._monitor.shutdown_eligible().get())):
226                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
227                 return None
228             else:
229                 return orig_func(self, *args, **kwargs)
230         return stop_wrapper
231
232     @_stop_if_window_closed
233     @ComputeNodeStateChangeBase._retry()
234     def shutdown_node(self):
235         if not self._cloud.destroy_node(self.cloud_node):
236             if self._cloud.broken(self.cloud_node):
237                 self._later.cancel_shutdown(self.NODE_BROKEN)
238             else:
239                 # Force a retry.
240                 raise cloud_types.LibcloudError("destroy_node failed")
241         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
242         arv_node = self._arvados_node()
243         if arv_node is None:
244             self._finished(success_flag=True)
245         else:
246             self._later.clean_arvados_node(arv_node)
247
248     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
249     def clean_arvados_node(self, arvados_node):
250         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
251         self._finished(success_flag=True)
252
253     # Make the decorator available to subclasses.
254     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
255
256
257 class ComputeNodeUpdateActor(config.actor_class):
258     """Actor to dispatch one-off cloud management requests.
259
260     This actor receives requests for small cloud updates, and
261     dispatches them to a real driver.  ComputeNodeMonitorActors use
262     this to perform maintenance tasks on themselves.  Having a
263     dedicated actor for this gives us the opportunity to control the
264     flow of requests; e.g., by backing off when errors occur.
265
266     This actor is most like a "traditional" Pykka actor: there's no
267     subscribing, but instead methods return real driver results.  If
268     you're interested in those results, you should get them from the
269     Future that the proxy method returns.  Be prepared to handle exceptions
270     from the cloud driver when you do.
271     """
272     def __init__(self, cloud_factory, max_retry_wait=180):
273         super(ComputeNodeUpdateActor, self).__init__()
274         self._cloud = cloud_factory()
275         self.max_retry_wait = max_retry_wait
276         self.error_streak = 0
277         self.next_request_time = time.time()
278
279     def _throttle_errors(orig_func):
280         @functools.wraps(orig_func)
281         def throttle_wrapper(self, *args, **kwargs):
282             throttle_time = self.next_request_time - time.time()
283             if throttle_time > 0:
284                 time.sleep(throttle_time)
285             self.next_request_time = time.time()
286             try:
287                 result = orig_func(self, *args, **kwargs)
288             except Exception as error:
289                 self.error_streak += 1
290                 self.next_request_time += min(2 ** self.error_streak,
291                                               self.max_retry_wait)
292                 raise
293             else:
294                 self.error_streak = 0
295                 return result
296         return throttle_wrapper
297
298     @_throttle_errors
299     def sync_node(self, cloud_node, arvados_node):
300         return self._cloud.sync_node(cloud_node, arvados_node)
301
302
303 class ComputeNodeMonitorActor(config.actor_class):
304     """Actor to manage a running compute node.
305
306     This actor gets updates about a compute node's cloud and Arvados records.
307     It uses this information to notify subscribers when the node is eligible
308     for shutdown.
309     """
310     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
311                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
312                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
313                  boot_fail_after=1800
314     ):
315         super(ComputeNodeMonitorActor, self).__init__()
316         self._later = self.actor_ref.proxy()
317         self._logger = logging.getLogger('arvnodeman.computenode')
318         self._last_log = None
319         self._shutdowns = shutdown_timer
320         self._cloud_node_fqdn = cloud_fqdn_func
321         self._timer = timer_actor
322         self._update = update_actor
323         self._cloud = cloud_client
324         self.cloud_node = cloud_node
325         self.cloud_node_start_time = cloud_node_start_time
326         self.poll_stale_after = poll_stale_after
327         self.node_stale_after = node_stale_after
328         self.boot_fail_after = boot_fail_after
329         self.subscribers = set()
330         self.arvados_node = None
331         self._later.update_arvados_node(arvados_node)
332         self.last_shutdown_opening = None
333         self._later.consider_shutdown()
334
335     def subscribe(self, subscriber):
336         self.subscribers.add(subscriber)
337
338     def _debug(self, msg, *args):
339         if msg == self._last_log:
340             return
341         self._last_log = msg
342         self._logger.debug(msg, *args)
343
344     def in_state(self, *states):
345         # Return a boolean to say whether or not our Arvados node record is in
346         # one of the given states.  If state information is not
347         # available--because this node has no Arvados record, the record is
348         # stale, or the record has no state information--return None.
349         if (self.arvados_node is None) or not timestamp_fresh(
350               arvados_node_mtime(self.arvados_node), self.node_stale_after):
351             return None
352         state = self.arvados_node['crunch_worker_state']
353         if not state:
354             return None
355         result = state in states
356         if state == 'idle':
357             result = result and not self.arvados_node['job_uuid']
358         return result
359
360     def shutdown_eligible(self):
361         if not self._shutdowns.window_open():
362             return False
363         if self.arvados_node is None:
364             # Node is unpaired.
365             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
366             return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
367         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
368         if missing and self._cloud.broken(self.cloud_node):
369             # Node is paired, but Arvados says it is missing and the cloud says the node
370             # is in an error state, so shut it down.
371             return True
372         if missing is None and self._cloud.broken(self.cloud_node):
373             self._logger.warning(
374                 "cloud reports broken node, but paired node %s never pinged "
375                 "(bug?) -- skipped check for node_stale_after",
376                 self.arvados_node['uuid'])
377         return self.in_state('idle')
378
379     def consider_shutdown(self):
380         next_opening = self._shutdowns.next_opening()
381         if self.shutdown_eligible():
382             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
383             _notify_subscribers(self._later, self.subscribers)
384         elif self._shutdowns.window_open():
385             self._debug("Node %s shutdown window open but node busy.",
386                         self.cloud_node.id)
387         elif self.last_shutdown_opening != next_opening:
388             self._debug("Node %s shutdown window closed.  Next at %s.",
389                         self.cloud_node.id, time.ctime(next_opening))
390             self._timer.schedule(next_opening, self._later.consider_shutdown)
391             self.last_shutdown_opening = next_opening
392
393     def offer_arvados_pair(self, arvados_node):
394         first_ping_s = arvados_node.get('first_ping_at')
395         if (self.arvados_node is not None) or (not first_ping_s):
396             return None
397         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
398               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
399             self._later.update_arvados_node(arvados_node)
400             return self.cloud_node.id
401         else:
402             return None
403
404     def update_cloud_node(self, cloud_node):
405         if cloud_node is not None:
406             self.cloud_node = cloud_node
407             self._later.consider_shutdown()
408
409     def update_arvados_node(self, arvados_node):
410         # If the cloud node's FQDN doesn't match what's in the Arvados node
411         # record, make them match.
412         # This method is a little unusual in the way it just fires off the
413         # request without checking the result or retrying errors.  That's
414         # because this update happens every time we reload the Arvados node
415         # list: if a previous sync attempt failed, we'll see that the names
416         # are out of sync and just try again.  ComputeNodeUpdateActor has
417         # the logic to throttle those effective retries when there's trouble.
418         if arvados_node is not None:
419             self.arvados_node = arvados_node
420             if (self._cloud_node_fqdn(self.cloud_node) !=
421                   arvados_node_fqdn(self.arvados_node)):
422                 self._update.sync_node(self.cloud_node, self.arvados_node)
423             self._later.consider_shutdown()