8c983c1ca4042726881ce0e9019da0e9c2df9b60
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19     """Base class for actors that change a compute node's state.
20
21     This base class takes care of retrying changes and notifying
22     subscribers when the change is finished.
23     """
24     def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
25                  retry_wait, max_retry_wait):
26         super(ComputeNodeStateChangeBase, self).__init__()
27         RetryMixin.__init__(self,
28                             retry_wait,
29                             max_retry_wait,
30                             logging.getLogger(logger_name),
31                             cloud_client,
32                             timer_actor)
33         self._later = self.actor_ref.proxy()
34         self._arvados = arvados_client
35         self.subscribers = set()
36
37     def _finished(self):
38         _notify_subscribers(self._later, self.subscribers)
39         self.subscribers = None
40
41     def subscribe(self, subscriber):
42         if self.subscribers is None:
43             try:
44                 subscriber(self._later)
45             except pykka.ActorDeadError:
46                 pass
47         else:
48             self.subscribers.add(subscriber)
49
50     def _clean_arvados_node(self, arvados_node, explanation):
51         return self._arvados.nodes().update(
52             uuid=arvados_node['uuid'],
53             body={'hostname': None,
54                   'ip_address': None,
55                   'slot_number': None,
56                   'first_ping_at': None,
57                   'last_ping_at': None,
58                   'properties': {},
59                   'info': {'ec2_instance_id': None,
60                            'last_action': explanation}},
61             ).execute()
62
63
64 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
65     """Actor to create and set up a cloud compute node.
66
67     This actor prepares an Arvados node record for a new compute node
68     (either creating one or cleaning one passed in), then boots the
69     actual compute node.  It notifies subscribers when the cloud node
70     is successfully created (the last step in the process for Node
71     Manager to handle).
72     """
73     def __init__(self, timer_actor, arvados_client, cloud_client,
74                  cloud_size, arvados_node=None,
75                  retry_wait=1, max_retry_wait=180):
76         super(ComputeNodeSetupActor, self).__init__(
77             'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
78             retry_wait, max_retry_wait)
79         self.cloud_size = cloud_size
80         self.arvados_node = None
81         self.cloud_node = None
82         if arvados_node is None:
83             self._later.create_arvados_node()
84         else:
85             self._later.prepare_arvados_node(arvados_node)
86
87     @RetryMixin._retry(config.ARVADOS_ERRORS)
88     def create_arvados_node(self):
89         self.arvados_node = self._arvados.nodes().create(body={}).execute()
90         self._later.create_cloud_node()
91
92     @RetryMixin._retry(config.ARVADOS_ERRORS)
93     def prepare_arvados_node(self, node):
94         self.arvados_node = self._clean_arvados_node(
95             node, "Prepared by Node Manager")
96         self._later.create_cloud_node()
97
98     @RetryMixin._retry()
99     def create_cloud_node(self):
100         self._logger.info("Creating cloud node with size %s.",
101                           self.cloud_size.name)
102         self.cloud_node = self._cloud.create_node(self.cloud_size,
103                                                   self.arvados_node)
104         if not self.cloud_node.size:
105              self.cloud_node.size = self.cloud_size
106         self._logger.info("Cloud node %s created.", self.cloud_node.id)
107         self._later.update_arvados_node_properties()
108
109     @RetryMixin._retry(config.ARVADOS_ERRORS)
110     def update_arvados_node_properties(self):
111         """Tell Arvados some details about the cloud node.
112
113         Currently we only include size/price from our request, which
114         we already knew before create_cloud_node(), but doing it here
115         gives us an opportunity to provide more detail from
116         self.cloud_node, too.
117         """
118         self.arvados_node['properties']['cloud_node'] = {
119             # Note this 'size' is the node size we asked the cloud
120             # driver to create -- not necessarily equal to the size
121             # reported by the cloud driver for the node that was
122             # created.
123             'size': self.cloud_size.id,
124             'price': self.cloud_size.price,
125         }
126         self.arvados_node = self._arvados.nodes().update(
127             uuid=self.arvados_node['uuid'],
128             body={'properties': self.arvados_node['properties']},
129         ).execute()
130         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
131         self._later.post_create()
132
133     @RetryMixin._retry()
134     def post_create(self):
135         self._cloud.post_create_node(self.cloud_node)
136         self._logger.info("%s post-create work done.", self.cloud_node.id)
137         self._finished()
138
139     def stop_if_no_cloud_node(self):
140         if self.cloud_node is not None:
141             return False
142         self.stop()
143         return True
144
145
146 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
147     """Actor to shut down a compute node.
148
149     This actor simply destroys a cloud node, retrying as needed.
150     """
151     # Reasons for a shutdown to be cancelled.
152     WINDOW_CLOSED = "shutdown window closed"
153     NODE_BROKEN = "cloud failed to shut down broken node"
154
155     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
156                  cancellable=True, retry_wait=1, max_retry_wait=180):
157         # If a ShutdownActor is cancellable, it will ask the
158         # ComputeNodeMonitorActor if it's still eligible before taking each
159         # action, and stop the shutdown process if the node is no longer
160         # eligible.  Normal shutdowns based on job demand should be
161         # cancellable; shutdowns based on node misbehavior should not.
162         super(ComputeNodeShutdownActor, self).__init__(
163             'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
164             retry_wait, max_retry_wait)
165         self._monitor = node_monitor.proxy()
166         self.cloud_node = self._monitor.cloud_node.get()
167         self.cancellable = cancellable
168         self.cancel_reason = None
169         self.success = None
170
171     def on_start(self):
172         self._later.shutdown_node()
173
174     def _arvados_node(self):
175         return self._monitor.arvados_node.get()
176
177     def _finished(self, success_flag=None):
178         if success_flag is not None:
179             self.success = success_flag
180         return super(ComputeNodeShutdownActor, self)._finished()
181
182     def cancel_shutdown(self, reason):
183         self.cancel_reason = reason
184         self._logger.info("Cloud node %s shutdown cancelled: %s.",
185                           self.cloud_node.id, reason)
186         self._finished(success_flag=False)
187
188     def _stop_if_window_closed(orig_func):
189         @functools.wraps(orig_func)
190         def stop_wrapper(self, *args, **kwargs):
191             if (self.cancellable and
192                   (not self._monitor.shutdown_eligible().get())):
193                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
194                 return None
195             else:
196                 return orig_func(self, *args, **kwargs)
197         return stop_wrapper
198
199     @_stop_if_window_closed
200     @RetryMixin._retry()
201     def shutdown_node(self):
202         if not self._cloud.destroy_node(self.cloud_node):
203             if self._cloud.broken(self.cloud_node):
204                 self._later.cancel_shutdown(self.NODE_BROKEN)
205             else:
206                 # Force a retry.
207                 raise cloud_types.LibcloudError("destroy_node failed")
208         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
209         arv_node = self._arvados_node()
210         if arv_node is None:
211             self._finished(success_flag=True)
212         else:
213             self._later.clean_arvados_node(arv_node)
214
215     @RetryMixin._retry(config.ARVADOS_ERRORS)
216     def clean_arvados_node(self, arvados_node):
217         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
218         self._finished(success_flag=True)
219
220     # Make the decorator available to subclasses.
221     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
222
223
224 class ComputeNodeUpdateActor(config.actor_class):
225     """Actor to dispatch one-off cloud management requests.
226
227     This actor receives requests for small cloud updates, and
228     dispatches them to a real driver.  ComputeNodeMonitorActors use
229     this to perform maintenance tasks on themselves.  Having a
230     dedicated actor for this gives us the opportunity to control the
231     flow of requests; e.g., by backing off when errors occur.
232
233     This actor is most like a "traditional" Pykka actor: there's no
234     subscribing, but instead methods return real driver results.  If
235     you're interested in those results, you should get them from the
236     Future that the proxy method returns.  Be prepared to handle exceptions
237     from the cloud driver when you do.
238     """
239     def __init__(self, cloud_factory, max_retry_wait=180):
240         super(ComputeNodeUpdateActor, self).__init__()
241         self._cloud = cloud_factory()
242         self.max_retry_wait = max_retry_wait
243         self.error_streak = 0
244         self.next_request_time = time.time()
245
246     def _throttle_errors(orig_func):
247         @functools.wraps(orig_func)
248         def throttle_wrapper(self, *args, **kwargs):
249             throttle_time = self.next_request_time - time.time()
250             if throttle_time > 0:
251                 time.sleep(throttle_time)
252             self.next_request_time = time.time()
253             try:
254                 result = orig_func(self, *args, **kwargs)
255             except Exception as error:
256                 self.error_streak += 1
257                 self.next_request_time += min(2 ** self.error_streak,
258                                               self.max_retry_wait)
259                 raise
260             else:
261                 self.error_streak = 0
262                 return result
263         return throttle_wrapper
264
265     @_throttle_errors
266     def sync_node(self, cloud_node, arvados_node):
267         return self._cloud.sync_node(cloud_node, arvados_node)
268
269
270 class ComputeNodeMonitorActor(config.actor_class):
271     """Actor to manage a running compute node.
272
273     This actor gets updates about a compute node's cloud and Arvados records.
274     It uses this information to notify subscribers when the node is eligible
275     for shutdown.
276     """
277     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
278                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
279                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
280                  boot_fail_after=1800
281     ):
282         super(ComputeNodeMonitorActor, self).__init__()
283         self._later = self.actor_ref.proxy()
284         self._logger = logging.getLogger('arvnodeman.computenode')
285         self._last_log = None
286         self._shutdowns = shutdown_timer
287         self._cloud_node_fqdn = cloud_fqdn_func
288         self._timer = timer_actor
289         self._update = update_actor
290         self._cloud = cloud_client
291         self.cloud_node = cloud_node
292         self.cloud_node_start_time = cloud_node_start_time
293         self.poll_stale_after = poll_stale_after
294         self.node_stale_after = node_stale_after
295         self.boot_fail_after = boot_fail_after
296         self.subscribers = set()
297         self.arvados_node = None
298         self._later.update_arvados_node(arvados_node)
299         self.last_shutdown_opening = None
300         self._later.consider_shutdown()
301
302     def subscribe(self, subscriber):
303         self.subscribers.add(subscriber)
304
305     def _debug(self, msg, *args):
306         if msg == self._last_log:
307             return
308         self._last_log = msg
309         self._logger.debug(msg, *args)
310
311     def in_state(self, *states):
312         # Return a boolean to say whether or not our Arvados node record is in
313         # one of the given states.  If state information is not
314         # available--because this node has no Arvados record, the record is
315         # stale, or the record has no state information--return None.
316         if (self.arvados_node is None) or not timestamp_fresh(
317               arvados_node_mtime(self.arvados_node), self.node_stale_after):
318             return None
319         state = self.arvados_node['crunch_worker_state']
320         if not state:
321             return None
322         result = state in states
323         if state == 'idle':
324             result = result and not self.arvados_node['job_uuid']
325         return result
326
327     def shutdown_eligible(self):
328         if not self._shutdowns.window_open():
329             return False
330         if self.arvados_node is None:
331             # Node is unpaired.
332             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
333             return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
334         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
335         if missing and self._cloud.broken(self.cloud_node):
336             # Node is paired, but Arvados says it is missing and the cloud says the node
337             # is in an error state, so shut it down.
338             return True
339         if missing is None and self._cloud.broken(self.cloud_node):
340             self._logger.warning(
341                 "cloud reports broken node, but paired node %s never pinged "
342                 "(bug?) -- skipped check for node_stale_after",
343                 self.arvados_node['uuid'])
344         return self.in_state('idle')
345
346     def consider_shutdown(self):
347         next_opening = self._shutdowns.next_opening()
348         if self.shutdown_eligible():
349             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
350             _notify_subscribers(self._later, self.subscribers)
351         elif self._shutdowns.window_open():
352             self._debug("Node %s shutdown window open but node busy.",
353                         self.cloud_node.id)
354         elif self.last_shutdown_opening != next_opening:
355             self._debug("Node %s shutdown window closed.  Next at %s.",
356                         self.cloud_node.id, time.ctime(next_opening))
357             self._timer.schedule(next_opening, self._later.consider_shutdown)
358             self.last_shutdown_opening = next_opening
359
360     def offer_arvados_pair(self, arvados_node):
361         first_ping_s = arvados_node.get('first_ping_at')
362         if (self.arvados_node is not None) or (not first_ping_s):
363             return None
364         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
365               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
366             self._later.update_arvados_node(arvados_node)
367             return self.cloud_node.id
368         else:
369             return None
370
371     def update_cloud_node(self, cloud_node):
372         if cloud_node is not None:
373             self.cloud_node = cloud_node
374             self._later.consider_shutdown()
375
376     def update_arvados_node(self, arvados_node):
377         # If the cloud node's FQDN doesn't match what's in the Arvados node
378         # record, make them match.
379         # This method is a little unusual in the way it just fires off the
380         # request without checking the result or retrying errors.  That's
381         # because this update happens every time we reload the Arvados node
382         # list: if a previous sync attempt failed, we'll see that the names
383         # are out of sync and just try again.  ComputeNodeUpdateActor has
384         # the logic to throttle those effective retries when there's trouble.
385         if arvados_node is not None:
386             self.arvados_node = arvados_node
387             if (self._cloud_node_fqdn(self.cloud_node) !=
388                   arvados_node_fqdn(self.arvados_node)):
389                 self._update.sync_node(self.cloud_node, self.arvados_node)
390             self._later.consider_shutdown()