8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19     """Base class for actors that change a compute node's state.
20
21     This base class takes care of retrying changes and notifying
22     subscribers when the change is finished.
23     """
24     def __init__(self, cloud_client, arvados_client, timer_actor,
25                  retry_wait, max_retry_wait):
26         super(ComputeNodeStateChangeBase, self).__init__()
27         RetryMixin.__init__(self, retry_wait, max_retry_wait,
28                             None, cloud_client, timer_actor)
29         self._later = self.actor_ref.proxy()
30         self._arvados = arvados_client
31         self.subscribers = set()
32
33     def _set_logger(self):
34         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
35
36     def on_start(self):
37         self._set_logger()
38
39     def _finished(self):
40         _notify_subscribers(self._later, self.subscribers)
41         self.subscribers = None
42         self._logger.info("finished")
43
44     def subscribe(self, subscriber):
45         if self.subscribers is None:
46             try:
47                 subscriber(self._later)
48             except pykka.ActorDeadError:
49                 pass
50         else:
51             self.subscribers.add(subscriber)
52
53     def _clean_arvados_node(self, arvados_node, explanation):
54         return self._arvados.nodes().update(
55             uuid=arvados_node['uuid'],
56             body={'hostname': None,
57                   'ip_address': None,
58                   'slot_number': None,
59                   'first_ping_at': None,
60                   'last_ping_at': None,
61                   'properties': {},
62                   'info': {'ec2_instance_id': None,
63                            'last_action': explanation}},
64             ).execute()
65
66     @staticmethod
67     def _finish_on_exception(orig_func):
68         @functools.wraps(orig_func)
69         def finish_wrapper(self, *args, **kwargs):
70             try:
71                 return orig_func(self, *args, **kwargs)
72             except Exception as error:
73                 self._logger.error("Actor error %s", error)
74                 self._finished()
75         return finish_wrapper
76
77
78 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
79     """Actor to create and set up a cloud compute node.
80
81     This actor prepares an Arvados node record for a new compute node
82     (either creating one or cleaning one passed in), then boots the
83     actual compute node.  It notifies subscribers when the cloud node
84     is successfully created (the last step in the process for Node
85     Manager to handle).
86     """
87     def __init__(self, timer_actor, arvados_client, cloud_client,
88                  cloud_size, arvados_node=None,
89                  retry_wait=1, max_retry_wait=180):
90         super(ComputeNodeSetupActor, self).__init__(
91             cloud_client, arvados_client, timer_actor,
92             retry_wait, max_retry_wait)
93         self.cloud_size = cloud_size
94         self.arvados_node = None
95         self.cloud_node = None
96         if arvados_node is None:
97             self._later.create_arvados_node()
98         else:
99             self._later.prepare_arvados_node(arvados_node)
100
101     @ComputeNodeStateChangeBase._finish_on_exception
102     @RetryMixin._retry(config.ARVADOS_ERRORS)
103     def create_arvados_node(self):
104         self.arvados_node = self._arvados.nodes().create(body={}).execute()
105         self._later.create_cloud_node()
106
107     @ComputeNodeStateChangeBase._finish_on_exception
108     @RetryMixin._retry(config.ARVADOS_ERRORS)
109     def prepare_arvados_node(self, node):
110         self.arvados_node = self._clean_arvados_node(
111             node, "Prepared by Node Manager")
112         self._later.create_cloud_node()
113
114     @ComputeNodeStateChangeBase._finish_on_exception
115     @RetryMixin._retry()
116     def create_cloud_node(self):
117         self._logger.info("Sending create_node request for node size %s.",
118                           self.cloud_size.name)
119         self.cloud_node = self._cloud.create_node(self.cloud_size,
120                                                   self.arvados_node)
121         if not self.cloud_node.size:
122              self.cloud_node.size = self.cloud_size
123         self._logger.info("Cloud node %s created.", self.cloud_node.id)
124         self._later.update_arvados_node_properties()
125
126     @ComputeNodeStateChangeBase._finish_on_exception
127     @RetryMixin._retry(config.ARVADOS_ERRORS)
128     def update_arvados_node_properties(self):
129         """Tell Arvados some details about the cloud node.
130
131         Currently we only include size/price from our request, which
132         we already knew before create_cloud_node(), but doing it here
133         gives us an opportunity to provide more detail from
134         self.cloud_node, too.
135         """
136         self.arvados_node['properties']['cloud_node'] = {
137             # Note this 'size' is the node size we asked the cloud
138             # driver to create -- not necessarily equal to the size
139             # reported by the cloud driver for the node that was
140             # created.
141             'size': self.cloud_size.id,
142             'price': self.cloud_size.price,
143         }
144         self.arvados_node = self._arvados.nodes().update(
145             uuid=self.arvados_node['uuid'],
146             body={'properties': self.arvados_node['properties']},
147         ).execute()
148         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
149         self._later.post_create()
150
151     @RetryMixin._retry()
152     def post_create(self):
153         self._cloud.post_create_node(self.cloud_node)
154         self._logger.info("%s post-create work done.", self.cloud_node.id)
155         self._finished()
156
157     def stop_if_no_cloud_node(self):
158         if self.cloud_node is not None:
159             return False
160         self.stop()
161         return True
162
163
164 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
165     """Actor to shut down a compute node.
166
167     This actor simply destroys a cloud node, retrying as needed.
168     """
169     # Reasons for a shutdown to be cancelled.
170     WINDOW_CLOSED = "shutdown window closed"
171     NODE_BROKEN = "cloud failed to shut down broken node"
172
173     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
174                  cancellable=True, retry_wait=1, max_retry_wait=180):
175         # If a ShutdownActor is cancellable, it will ask the
176         # ComputeNodeMonitorActor if it's still eligible before taking each
177         # action, and stop the shutdown process if the node is no longer
178         # eligible.  Normal shutdowns based on job demand should be
179         # cancellable; shutdowns based on node misbehavior should not.
180         super(ComputeNodeShutdownActor, self).__init__(
181             cloud_client, arvados_client, timer_actor,
182             retry_wait, max_retry_wait)
183         self._monitor = node_monitor.proxy()
184         self.cloud_node = self._monitor.cloud_node.get()
185         self.cancellable = cancellable
186         self.cancel_reason = None
187         self.success = None
188
189     def _set_logger(self):
190         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
191
192     def on_start(self):
193         super(ComputeNodeShutdownActor, self).on_start()
194         self._later.shutdown_node()
195
196     def _arvados_node(self):
197         return self._monitor.arvados_node.get()
198
199     def _finished(self, success_flag=None):
200         if success_flag is not None:
201             self.success = success_flag
202         return super(ComputeNodeShutdownActor, self)._finished()
203
204     def cancel_shutdown(self, reason):
205         self.cancel_reason = reason
206         self._logger.info("Shutdown cancelled: %s.", reason)
207         self._finished(success_flag=False)
208
209     def _stop_if_window_closed(orig_func):
210         @functools.wraps(orig_func)
211         def stop_wrapper(self, *args, **kwargs):
212             if (self.cancellable and
213                   (self._monitor.shutdown_eligible().get() is not True)):
214                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
215                 return None
216             else:
217                 return orig_func(self, *args, **kwargs)
218         return stop_wrapper
219
220     @ComputeNodeStateChangeBase._finish_on_exception
221     @_stop_if_window_closed
222     @RetryMixin._retry()
223     def shutdown_node(self):
224         self._logger.info("Starting shutdown")
225         if not self._cloud.destroy_node(self.cloud_node):
226             if self._cloud.broken(self.cloud_node):
227                 self._later.cancel_shutdown(self.NODE_BROKEN)
228             else:
229                 # Force a retry.
230                 raise cloud_types.LibcloudError("destroy_node failed")
231         self._logger.info("Shutdown success")
232         arv_node = self._arvados_node()
233         if arv_node is None:
234             self._finished(success_flag=True)
235         else:
236             self._later.clean_arvados_node(arv_node)
237
238     @ComputeNodeStateChangeBase._finish_on_exception
239     @RetryMixin._retry(config.ARVADOS_ERRORS)
240     def clean_arvados_node(self, arvados_node):
241         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
242         self._finished(success_flag=True)
243
244     # Make the decorator available to subclasses.
245     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
246
247
248 class ComputeNodeUpdateActor(config.actor_class):
249     """Actor to dispatch one-off cloud management requests.
250
251     This actor receives requests for small cloud updates, and
252     dispatches them to a real driver.  ComputeNodeMonitorActors use
253     this to perform maintenance tasks on themselves.  Having a
254     dedicated actor for this gives us the opportunity to control the
255     flow of requests; e.g., by backing off when errors occur.
256
257     This actor is most like a "traditional" Pykka actor: there's no
258     subscribing, but instead methods return real driver results.  If
259     you're interested in those results, you should get them from the
260     Future that the proxy method returns.  Be prepared to handle exceptions
261     from the cloud driver when you do.
262     """
263     def __init__(self, cloud_factory, max_retry_wait=180):
264         super(ComputeNodeUpdateActor, self).__init__()
265         self._cloud = cloud_factory()
266         self.max_retry_wait = max_retry_wait
267         self.error_streak = 0
268         self.next_request_time = time.time()
269
270     def _throttle_errors(orig_func):
271         @functools.wraps(orig_func)
272         def throttle_wrapper(self, *args, **kwargs):
273             throttle_time = self.next_request_time - time.time()
274             if throttle_time > 0:
275                 time.sleep(throttle_time)
276             self.next_request_time = time.time()
277             try:
278                 result = orig_func(self, *args, **kwargs)
279             except Exception as error:
280                 self.error_streak += 1
281                 self.next_request_time += min(2 ** self.error_streak,
282                                               self.max_retry_wait)
283                 raise
284             else:
285                 self.error_streak = 0
286                 return result
287         return throttle_wrapper
288
289     @_throttle_errors
290     def sync_node(self, cloud_node, arvados_node):
291         return self._cloud.sync_node(cloud_node, arvados_node)
292
293
294 class ComputeNodeMonitorActor(config.actor_class):
295     """Actor to manage a running compute node.
296
297     This actor gets updates about a compute node's cloud and Arvados records.
298     It uses this information to notify subscribers when the node is eligible
299     for shutdown.
300     """
301     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
302                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
303                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
304                  boot_fail_after=1800
305     ):
306         super(ComputeNodeMonitorActor, self).__init__()
307         self._later = self.actor_ref.proxy()
308         self._last_log = None
309         self._shutdowns = shutdown_timer
310         self._cloud_node_fqdn = cloud_fqdn_func
311         self._timer = timer_actor
312         self._update = update_actor
313         self._cloud = cloud_client
314         self.cloud_node = cloud_node
315         self.cloud_node_start_time = cloud_node_start_time
316         self.poll_stale_after = poll_stale_after
317         self.node_stale_after = node_stale_after
318         self.boot_fail_after = boot_fail_after
319         self.subscribers = set()
320         self.arvados_node = None
321         self._later.update_arvados_node(arvados_node)
322         self.last_shutdown_opening = None
323         self._later.consider_shutdown()
324
325     def _set_logger(self):
326         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
327
328     def on_start(self):
329         self._set_logger()
330         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
331
332     def subscribe(self, subscriber):
333         self.subscribers.add(subscriber)
334
335     def _debug(self, msg, *args):
336         if msg == self._last_log:
337             return
338         self._last_log = msg
339         self._logger.debug(msg, *args)
340
341     def in_state(self, *states):
342         # Return a boolean to say whether or not our Arvados node record is in
343         # one of the given states.  If state information is not
344         # available--because this node has no Arvados record, the record is
345         # stale, or the record has no state information--return None.
346         if (self.arvados_node is None) or not timestamp_fresh(
347               arvados_node_mtime(self.arvados_node), self.node_stale_after):
348             return None
349         state = self.arvados_node['crunch_worker_state']
350         if not state:
351             return None
352         result = state in states
353         if state == 'idle':
354             result = result and not self.arvados_node['job_uuid']
355         return result
356
357     def shutdown_eligible(self):
358         """Return True if eligible for shutdown, or a string explaining why the node
359         is not eligible for shutdown."""
360
361         if not self._shutdowns.window_open():
362             return "shutdown window is not open."
363         if self.arvados_node is None:
364             # Node is unpaired.
365             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
366             if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
367                 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
368             else:
369                 return True
370         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
371         if missing and self._cloud.broken(self.cloud_node):
372             # Node is paired, but Arvados says it is missing and the cloud says the node
373             # is in an error state, so shut it down.
374             return True
375         if missing is None and self._cloud.broken(self.cloud_node):
376             self._logger.info(
377                 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
378                 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
379                 self.arvados_node['uuid'])
380         if self.in_state('idle'):
381             return True
382         else:
383             return "node is not idle."
384
385     def consider_shutdown(self):
386         try:
387             next_opening = self._shutdowns.next_opening()
388             eligible = self.shutdown_eligible()
389             if eligible is True:
390                 self._debug("Suggesting shutdown.")
391                 _notify_subscribers(self._later, self.subscribers)
392             elif self._shutdowns.window_open():
393                 self._debug("Cannot shut down because %s", eligible)
394             elif self.last_shutdown_opening != next_opening:
395                 self._debug("Shutdown window closed.  Next at %s.",
396                             time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
397                 self._timer.schedule(next_opening, self._later.consider_shutdown)
398                 self.last_shutdown_opening = next_opening
399         except Exception:
400             self._logger.exception("Unexpected exception")
401
402     def offer_arvados_pair(self, arvados_node):
403         first_ping_s = arvados_node.get('first_ping_at')
404         if (self.arvados_node is not None) or (not first_ping_s):
405             return None
406         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
407               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
408             self._later.update_arvados_node(arvados_node)
409             return self.cloud_node.id
410         else:
411             return None
412
413     def update_cloud_node(self, cloud_node):
414         if cloud_node is not None:
415             self.cloud_node = cloud_node
416             self._later.consider_shutdown()
417
418     def update_arvados_node(self, arvados_node):
419         # If the cloud node's FQDN doesn't match what's in the Arvados node
420         # record, make them match.
421         # This method is a little unusual in the way it just fires off the
422         # request without checking the result or retrying errors.  That's
423         # because this update happens every time we reload the Arvados node
424         # list: if a previous sync attempt failed, we'll see that the names
425         # are out of sync and just try again.  ComputeNodeUpdateActor has
426         # the logic to throttle those effective retries when there's trouble.
427         if arvados_node is not None:
428             self.arvados_node = arvados_node
429             if (self._cloud_node_fqdn(self.cloud_node) !=
430                   arvados_node_fqdn(self.arvados_node)):
431                 self._update.sync_node(self.cloud_node, self.arvados_node)
432             self._later.consider_shutdown()