9161: Adjusting behavior to accomodate down/broken/missing nodes.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
18
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20     """Base class for actors that change a compute node's state.
21
22     This base class takes care of retrying changes and notifying
23     subscribers when the change is finished.
24     """
25     def __init__(self, cloud_client, arvados_client, timer_actor,
26                  retry_wait, max_retry_wait):
27         super(ComputeNodeStateChangeBase, self).__init__()
28         RetryMixin.__init__(self, retry_wait, max_retry_wait,
29                             None, cloud_client, timer_actor)
30         self._later = self.actor_ref.tell_proxy()
31         self._arvados = arvados_client
32         self.subscribers = set()
33
34     def _set_logger(self):
35         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
36
37     def on_start(self):
38         self._set_logger()
39
40     def _finished(self):
41         if self.subscribers is None:
42             raise Exception("Actor tried to finish twice")
43         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44         self.subscribers = None
45         self._logger.info("finished")
46
47     def subscribe(self, subscriber):
48         if self.subscribers is None:
49             try:
50                 subscriber(self.actor_ref.proxy())
51             except pykka.ActorDeadError:
52                 pass
53         else:
54             self.subscribers.add(subscriber)
55
56     def _clean_arvados_node(self, arvados_node, explanation):
57         return self._arvados.nodes().update(
58             uuid=arvados_node['uuid'],
59             body={'hostname': None,
60                   'ip_address': None,
61                   'slot_number': None,
62                   'first_ping_at': None,
63                   'last_ping_at': None,
64                   'properties': {},
65                   'info': {'ec2_instance_id': None,
66                            'last_action': explanation}},
67             ).execute()
68
69     @staticmethod
70     def _finish_on_exception(orig_func):
71         @functools.wraps(orig_func)
72         def finish_wrapper(self, *args, **kwargs):
73             try:
74                 return orig_func(self, *args, **kwargs)
75             except Exception as error:
76                 self._logger.error("Actor error %s", error)
77                 self._finished()
78         return finish_wrapper
79
80
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82     """Actor to create and set up a cloud compute node.
83
84     This actor prepares an Arvados node record for a new compute node
85     (either creating one or cleaning one passed in), then boots the
86     actual compute node.  It notifies subscribers when the cloud node
87     is successfully created (the last step in the process for Node
88     Manager to handle).
89     """
90     def __init__(self, timer_actor, arvados_client, cloud_client,
91                  cloud_size, arvados_node=None,
92                  retry_wait=1, max_retry_wait=180):
93         super(ComputeNodeSetupActor, self).__init__(
94             cloud_client, arvados_client, timer_actor,
95             retry_wait, max_retry_wait)
96         self.cloud_size = cloud_size
97         self.arvados_node = None
98         self.cloud_node = None
99         if arvados_node is None:
100             self._later.create_arvados_node()
101         else:
102             self._later.prepare_arvados_node(arvados_node)
103
104     @ComputeNodeStateChangeBase._finish_on_exception
105     @RetryMixin._retry(config.ARVADOS_ERRORS)
106     def create_arvados_node(self):
107         self.arvados_node = self._arvados.nodes().create(body={}).execute()
108         self._later.create_cloud_node()
109
110     @ComputeNodeStateChangeBase._finish_on_exception
111     @RetryMixin._retry(config.ARVADOS_ERRORS)
112     def prepare_arvados_node(self, node):
113         self.arvados_node = self._clean_arvados_node(
114             node, "Prepared by Node Manager")
115         self._later.create_cloud_node()
116
117     @ComputeNodeStateChangeBase._finish_on_exception
118     @RetryMixin._retry()
119     def create_cloud_node(self):
120         self._logger.info("Sending create_node request for node size %s.",
121                           self.cloud_size.name)
122         self.cloud_node = self._cloud.create_node(self.cloud_size,
123                                                   self.arvados_node)
124         if not self.cloud_node.size:
125              self.cloud_node.size = self.cloud_size
126         self._logger.info("Cloud node %s created.", self.cloud_node.id)
127         self._later.update_arvados_node_properties()
128
129     @ComputeNodeStateChangeBase._finish_on_exception
130     @RetryMixin._retry(config.ARVADOS_ERRORS)
131     def update_arvados_node_properties(self):
132         """Tell Arvados some details about the cloud node.
133
134         Currently we only include size/price from our request, which
135         we already knew before create_cloud_node(), but doing it here
136         gives us an opportunity to provide more detail from
137         self.cloud_node, too.
138         """
139         self.arvados_node['properties']['cloud_node'] = {
140             # Note this 'size' is the node size we asked the cloud
141             # driver to create -- not necessarily equal to the size
142             # reported by the cloud driver for the node that was
143             # created.
144             'size': self.cloud_size.id,
145             'price': self.cloud_size.price,
146         }
147         self.arvados_node = self._arvados.nodes().update(
148             uuid=self.arvados_node['uuid'],
149             body={'properties': self.arvados_node['properties']},
150         ).execute()
151         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
152         self._later.post_create()
153
154     @RetryMixin._retry()
155     def post_create(self):
156         self._cloud.post_create_node(self.cloud_node)
157         self._logger.info("%s post-create work done.", self.cloud_node.id)
158         self._finished()
159
160     def stop_if_no_cloud_node(self):
161         if self.cloud_node is not None:
162             return False
163         self.stop()
164         return True
165
166
167 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
168     """Actor to shut down a compute node.
169
170     This actor simply destroys a cloud node, retrying as needed.
171     """
172     # Reasons for a shutdown to be cancelled.
173     WINDOW_CLOSED = "shutdown window closed"
174     NODE_BROKEN = "cloud failed to shut down broken node"
175
176     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
177                  cancellable=True, retry_wait=1, max_retry_wait=180):
178         # If a ShutdownActor is cancellable, it will ask the
179         # ComputeNodeMonitorActor if it's still eligible before taking each
180         # action, and stop the shutdown process if the node is no longer
181         # eligible.  Normal shutdowns based on job demand should be
182         # cancellable; shutdowns based on node misbehavior should not.
183         super(ComputeNodeShutdownActor, self).__init__(
184             cloud_client, arvados_client, timer_actor,
185             retry_wait, max_retry_wait)
186         self._monitor = node_monitor.proxy()
187         self.cloud_node = self._monitor.cloud_node.get()
188         self.cancellable = cancellable
189         self.cancel_reason = None
190         self.success = None
191
192     def _set_logger(self):
193         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
194
195     def on_start(self):
196         super(ComputeNodeShutdownActor, self).on_start()
197         self._later.shutdown_node()
198
199     def _arvados_node(self):
200         return self._monitor.arvados_node.get()
201
202     def _finished(self, success_flag=None):
203         if success_flag is not None:
204             self.success = success_flag
205         return super(ComputeNodeShutdownActor, self)._finished()
206
207     def cancel_shutdown(self, reason):
208         self.cancel_reason = reason
209         self._logger.info("Shutdown cancelled: %s.", reason)
210         self._finished(success_flag=False)
211
212     def _cancel_on_exception(orig_func):
213         @functools.wraps(orig_func)
214         def finish_wrapper(self, *args, **kwargs):
215             try:
216                 return orig_func(self, *args, **kwargs)
217             except Exception as error:
218                 self._logger.error("Actor error %s", error)
219                 self._later.cancel_shutdown("Unhandled exception %s" % error)
220         return finish_wrapper
221
222     @_cancel_on_exception
223     @RetryMixin._retry()
224     def shutdown_node(self):
225         self._logger.info("Starting shutdown")
226         if not self._cloud.destroy_node(self.cloud_node):
227             if self._cloud.broken(self.cloud_node):
228                 self._later.cancel_shutdown(self.NODE_BROKEN)
229                 return
230             else:
231                 # Force a retry.
232                 raise cloud_types.LibcloudError("destroy_node failed")
233         self._logger.info("Shutdown success")
234         arv_node = self._arvados_node()
235         if arv_node is None:
236             self._finished(success_flag=True)
237         else:
238             self._later.clean_arvados_node(arv_node)
239
240     @ComputeNodeStateChangeBase._finish_on_exception
241     @RetryMixin._retry(config.ARVADOS_ERRORS)
242     def clean_arvados_node(self, arvados_node):
243         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
244         self._finished(success_flag=True)
245
246
247 class ComputeNodeUpdateActor(config.actor_class):
248     """Actor to dispatch one-off cloud management requests.
249
250     This actor receives requests for small cloud updates, and
251     dispatches them to a real driver.  ComputeNodeMonitorActors use
252     this to perform maintenance tasks on themselves.  Having a
253     dedicated actor for this gives us the opportunity to control the
254     flow of requests; e.g., by backing off when errors occur.
255     """
256     def __init__(self, cloud_factory, max_retry_wait=180):
257         super(ComputeNodeUpdateActor, self).__init__()
258         self._cloud = cloud_factory()
259         self.max_retry_wait = max_retry_wait
260         self.error_streak = 0
261         self.next_request_time = time.time()
262
263     def _set_logger(self):
264         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
265
266     def on_start(self):
267         self._set_logger()
268
269     def _throttle_errors(orig_func):
270         @functools.wraps(orig_func)
271         def throttle_wrapper(self, *args, **kwargs):
272             throttle_time = self.next_request_time - time.time()
273             if throttle_time > 0:
274                 time.sleep(throttle_time)
275             self.next_request_time = time.time()
276             try:
277                 result = orig_func(self, *args, **kwargs)
278             except Exception as error:
279                 if self._cloud.is_cloud_exception(error):
280                     self.error_streak += 1
281                     self.next_request_time += min(2 ** self.error_streak,
282                                                   self.max_retry_wait)
283                 self._logger.warn(
284                     "Unhandled exception: %s", error, exc_info=error)
285             else:
286                 self.error_streak = 0
287                 return result
288         return throttle_wrapper
289
290     @_throttle_errors
291     def sync_node(self, cloud_node, arvados_node):
292         return self._cloud.sync_node(cloud_node, arvados_node)
293
294
295 class ComputeNodeMonitorActor(config.actor_class):
296     """Actor to manage a running compute node.
297
298     This actor gets updates about a compute node's cloud and Arvados records.
299     It uses this information to notify subscribers when the node is eligible
300     for shutdown.
301     """
302     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
303                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
304                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
305                  boot_fail_after=1800
306     ):
307         super(ComputeNodeMonitorActor, self).__init__()
308         self._later = self.actor_ref.tell_proxy()
309         self._last_log = None
310         self._shutdowns = shutdown_timer
311         self._cloud_node_fqdn = cloud_fqdn_func
312         self._timer = timer_actor
313         self._update = update_actor
314         self._cloud = cloud_client
315         self.cloud_node = cloud_node
316         self.cloud_node_start_time = cloud_node_start_time
317         self.poll_stale_after = poll_stale_after
318         self.node_stale_after = node_stale_after
319         self.boot_fail_after = boot_fail_after
320         self.subscribers = set()
321         self.arvados_node = None
322         self._later.update_arvados_node(arvados_node)
323         self.last_shutdown_opening = None
324         self._later.consider_shutdown()
325
326     def _set_logger(self):
327         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
328
329     def on_start(self):
330         self._set_logger()
331         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
332
333     def subscribe(self, subscriber):
334         self.subscribers.add(subscriber)
335
336     def _debug(self, msg, *args):
337         if msg == self._last_log:
338             return
339         self._last_log = msg
340         self._logger.debug(msg, *args)
341
342     def in_state(self, *states):
343         # Return a boolean to say whether or not our Arvados node record is in
344         # one of the given states.  If state information is not
345         # available--because this node has no Arvados record, the record is
346         # stale, or the record has no state information--return None.
347         if (self.arvados_node is None) or not timestamp_fresh(
348               arvados_node_mtime(self.arvados_node), self.node_stale_after):
349             return None
350         state = self.arvados_node['crunch_worker_state']
351         if not state:
352             return None
353
354         # There's a window between when a node pings for the first time and the
355         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
356         # window, the node will still report as 'down'.  Check that
357         # first_ping_at is truthy and consider the node 'idle' during the
358         # initial boot grace period.
359         if (state == 'down' and
360             self.arvados_node['first_ping_at'] and
361             timestamp_fresh(self.cloud_node_start_time,
362                             self.boot_fail_after) and
363             not self._cloud.broken(self.cloud_node)):
364             state = 'idle'
365
366         # "missing" means last_ping_at is stale, this should be
367         # considered "down"
368         if arvados_node_missing(self.arvados_node, self.node_stale_after):
369             state = 'down'
370
371         result = state in states
372         if state == 'idle':
373             result = result and not self.arvados_node['job_uuid']
374
375         return result
376
377     def shutdown_eligible(self):
378         """Determine if node is candidate for shut down.
379
380         Returns a tuple of (boolean, string) where the first value is whether
381         the node is candidate for shut down, and the second value is the
382         reason for the decision.
383         """
384
385         # Collect states and then consult state transition table whether we
386         # should shut down.  Possible states are:
387         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
388         # window = ["open", "closed"]
389         # boot_grace = ["boot wait", "boot exceeded"]
390         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
391
392         if self.arvados_node is None:
393             crunch_worker_state = 'unpaired'
394         elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
395             return (False, "node state is stale")
396         elif self.in_state('down'):
397             crunch_worker_state = 'down'
398         elif self.in_state('idle'):
399             crunch_worker_state = 'idle'
400         elif self.in_state('busy'):
401             crunch_worker_state = 'busy'
402         else:
403             return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
404
405         window = "open" if self._shutdowns.window_open() else "closed"
406
407         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
408             boot_grace = "boot wait"
409         else:
410             boot_grace = "boot exceeded"
411
412         # API server side not implemented yet.
413         idle_grace = 'idle exceeded'
414
415         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
416         t = transitions[node_state]
417         if t is not None:
418             # yes, shutdown eligible
419             return (True, "node state is %s" % (node_state,))
420         else:
421             # no, return a reason
422             return (False, "node state is %s" % (node_state,))
423
424     def consider_shutdown(self):
425         try:
426             eligible, reason = self.shutdown_eligible()
427             next_opening = self._shutdowns.next_opening()
428             if eligible:
429                 self._debug("Suggesting shutdown because %s", reason)
430                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
431             else:
432                 self._debug("Not eligible for shut down because %s", reason)
433
434                 if self.last_shutdown_opening != next_opening:
435                     self._debug("Shutdown window closed.  Next at %s.",
436                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
437                     self._timer.schedule(next_opening, self._later.consider_shutdown)
438                     self.last_shutdown_opening = next_opening
439         except Exception:
440             self._logger.exception("Unexpected exception")
441
442     def offer_arvados_pair(self, arvados_node):
443         first_ping_s = arvados_node.get('first_ping_at')
444         if (self.arvados_node is not None) or (not first_ping_s):
445             return None
446         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
447               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
448             self._later.update_arvados_node(arvados_node)
449             return self.cloud_node.id
450         else:
451             return None
452
453     def update_cloud_node(self, cloud_node):
454         if cloud_node is not None:
455             self.cloud_node = cloud_node
456             self._later.consider_shutdown()
457
458     def update_arvados_node(self, arvados_node):
459         # If the cloud node's FQDN doesn't match what's in the Arvados node
460         # record, make them match.
461         # This method is a little unusual in the way it just fires off the
462         # request without checking the result or retrying errors.  That's
463         # because this update happens every time we reload the Arvados node
464         # list: if a previous sync attempt failed, we'll see that the names
465         # are out of sync and just try again.  ComputeNodeUpdateActor has
466         # the logic to throttle those effective retries when there's trouble.
467         if arvados_node is not None:
468             self.arvados_node = arvados_node
469             if (self._cloud_node_fqdn(self.cloud_node) !=
470                   arvados_node_fqdn(self.arvados_node)):
471                 self._update.sync_node(self.cloud_node, self.arvados_node)
472             self._later.consider_shutdown()