b43e8a38ba0fd1cf2f07d25d20f2fc61b7202994
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
18
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20     """Base class for actors that change a compute node's state.
21
22     This base class takes care of retrying changes and notifying
23     subscribers when the change is finished.
24     """
25     def __init__(self, cloud_client, arvados_client, timer_actor,
26                  retry_wait, max_retry_wait):
27         super(ComputeNodeStateChangeBase, self).__init__()
28         RetryMixin.__init__(self, retry_wait, max_retry_wait,
29                             None, cloud_client, timer_actor)
30         self._later = self.actor_ref.tell_proxy()
31         self._arvados = arvados_client
32         self.subscribers = set()
33
34     def _set_logger(self):
35         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
36
37     def on_start(self):
38         self._set_logger()
39
40     def _finished(self):
41         if self.subscribers is None:
42             raise Exception("Actor tried to finish twice")
43         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44         self.subscribers = None
45         self._logger.info("finished")
46
47     def subscribe(self, subscriber):
48         if self.subscribers is None:
49             try:
50                 subscriber(self.actor_ref.proxy())
51             except pykka.ActorDeadError:
52                 pass
53         else:
54             self.subscribers.add(subscriber)
55
56     def _clean_arvados_node(self, arvados_node, explanation):
57         return self._arvados.nodes().update(
58             uuid=arvados_node['uuid'],
59             body={'hostname': None,
60                   'ip_address': None,
61                   'slot_number': None,
62                   'first_ping_at': None,
63                   'last_ping_at': None,
64                   'properties': {},
65                   'info': {'ec2_instance_id': None,
66                            'last_action': explanation}},
67             ).execute()
68
69     @staticmethod
70     def _finish_on_exception(orig_func):
71         @functools.wraps(orig_func)
72         def finish_wrapper(self, *args, **kwargs):
73             try:
74                 return orig_func(self, *args, **kwargs)
75             except Exception as error:
76                 self._logger.error("Actor error %s", error)
77                 self._finished()
78         return finish_wrapper
79
80
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82     """Actor to create and set up a cloud compute node.
83
84     This actor prepares an Arvados node record for a new compute node
85     (either creating one or cleaning one passed in), then boots the
86     actual compute node.  It notifies subscribers when the cloud node
87     is successfully created (the last step in the process for Node
88     Manager to handle).
89     """
90     def __init__(self, timer_actor, arvados_client, cloud_client,
91                  cloud_size, arvados_node,
92                  retry_wait=1, max_retry_wait=180):
93         super(ComputeNodeSetupActor, self).__init__(
94             cloud_client, arvados_client, timer_actor,
95             retry_wait, max_retry_wait)
96         self.cloud_size = cloud_size
97         self.arvados_node = None
98         self.cloud_node = None
99         self._later.prepare_arvados_node(arvados_node)
100
101     @ComputeNodeStateChangeBase._finish_on_exception
102     @RetryMixin._retry(config.ARVADOS_ERRORS)
103     def prepare_arvados_node(self, node):
104         self.arvados_node = self._clean_arvados_node(
105             node, "Prepared by Node Manager")
106         self._later.create_cloud_node()
107
108     @ComputeNodeStateChangeBase._finish_on_exception
109     @RetryMixin._retry()
110     def create_cloud_node(self):
111         self._logger.info("Sending create_node request for node size %s.",
112                           self.cloud_size.name)
113         self.cloud_node = self._cloud.create_node(self.cloud_size,
114                                                   self.arvados_node)
115         if not self.cloud_node.size:
116              self.cloud_node.size = self.cloud_size
117         self._logger.info("Cloud node %s created.", self.cloud_node.id)
118         self._later.update_arvados_node_properties()
119
120     @ComputeNodeStateChangeBase._finish_on_exception
121     @RetryMixin._retry(config.ARVADOS_ERRORS)
122     def update_arvados_node_properties(self):
123         """Tell Arvados some details about the cloud node.
124
125         Currently we only include size/price from our request, which
126         we already knew before create_cloud_node(), but doing it here
127         gives us an opportunity to provide more detail from
128         self.cloud_node, too.
129         """
130         self.arvados_node['properties']['cloud_node'] = {
131             # Note this 'size' is the node size we asked the cloud
132             # driver to create -- not necessarily equal to the size
133             # reported by the cloud driver for the node that was
134             # created.
135             'size': self.cloud_size.id,
136             'price': self.cloud_size.price,
137         }
138         self.arvados_node = self._arvados.nodes().update(
139             uuid=self.arvados_node['uuid'],
140             body={'properties': self.arvados_node['properties']},
141         ).execute()
142         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
143         self._later.post_create()
144
145     @RetryMixin._retry()
146     def post_create(self):
147         self._cloud.post_create_node(self.cloud_node)
148         self._logger.info("%s post-create work done.", self.cloud_node.id)
149         self._finished()
150
151     def stop_if_no_cloud_node(self):
152         if self.cloud_node is not None:
153             return False
154         self.stop()
155         return True
156
157
158 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
159     """Actor to shut down a compute node.
160
161     This actor simply destroys a cloud node, retrying as needed.
162     """
163     # Reasons for a shutdown to be cancelled.
164     WINDOW_CLOSED = "shutdown window closed"
165     NODE_BROKEN = "cloud failed to shut down broken node"
166
167     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
168                  cancellable=True, retry_wait=1, max_retry_wait=180):
169         # If a ShutdownActor is cancellable, it will ask the
170         # ComputeNodeMonitorActor if it's still eligible before taking each
171         # action, and stop the shutdown process if the node is no longer
172         # eligible.  Normal shutdowns based on job demand should be
173         # cancellable; shutdowns based on node misbehavior should not.
174         super(ComputeNodeShutdownActor, self).__init__(
175             cloud_client, arvados_client, timer_actor,
176             retry_wait, max_retry_wait)
177         self._monitor = node_monitor.proxy()
178         self.cloud_node = self._monitor.cloud_node.get()
179         self.cancellable = cancellable
180         self.cancel_reason = None
181         self.success = None
182
183     def _set_logger(self):
184         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
185
186     def on_start(self):
187         super(ComputeNodeShutdownActor, self).on_start()
188         self._later.shutdown_node()
189
190     def _arvados_node(self):
191         return self._monitor.arvados_node.get()
192
193     def _finished(self, success_flag=None):
194         if success_flag is not None:
195             self.success = success_flag
196         return super(ComputeNodeShutdownActor, self)._finished()
197
198     def cancel_shutdown(self, reason):
199         self.cancel_reason = reason
200         self._logger.info("Shutdown cancelled: %s.", reason)
201         self._finished(success_flag=False)
202
203     def _cancel_on_exception(orig_func):
204         @functools.wraps(orig_func)
205         def finish_wrapper(self, *args, **kwargs):
206             try:
207                 return orig_func(self, *args, **kwargs)
208             except Exception as error:
209                 self._logger.error("Actor error %s", error)
210                 self._later.cancel_shutdown("Unhandled exception %s" % error)
211         return finish_wrapper
212
213     @_cancel_on_exception
214     @RetryMixin._retry()
215     def shutdown_node(self):
216         self._logger.info("Starting shutdown")
217         if not self._cloud.destroy_node(self.cloud_node):
218             if self._cloud.broken(self.cloud_node):
219                 self._later.cancel_shutdown(self.NODE_BROKEN)
220                 return
221             else:
222                 # Force a retry.
223                 raise cloud_types.LibcloudError("destroy_node failed")
224         self._logger.info("Shutdown success")
225         arv_node = self._arvados_node()
226         if arv_node is None:
227             self._finished(success_flag=True)
228         else:
229             self._later.clean_arvados_node(arv_node)
230
231     @ComputeNodeStateChangeBase._finish_on_exception
232     @RetryMixin._retry(config.ARVADOS_ERRORS)
233     def clean_arvados_node(self, arvados_node):
234         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
235         self._finished(success_flag=True)
236
237
238 class ComputeNodeUpdateActor(config.actor_class):
239     """Actor to dispatch one-off cloud management requests.
240
241     This actor receives requests for small cloud updates, and
242     dispatches them to a real driver.  ComputeNodeMonitorActors use
243     this to perform maintenance tasks on themselves.  Having a
244     dedicated actor for this gives us the opportunity to control the
245     flow of requests; e.g., by backing off when errors occur.
246     """
247     def __init__(self, cloud_factory, max_retry_wait=180):
248         super(ComputeNodeUpdateActor, self).__init__()
249         self._cloud = cloud_factory()
250         self.max_retry_wait = max_retry_wait
251         self.error_streak = 0
252         self.next_request_time = time.time()
253
254     def _set_logger(self):
255         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
256
257     def on_start(self):
258         self._set_logger()
259
260     def _throttle_errors(orig_func):
261         @functools.wraps(orig_func)
262         def throttle_wrapper(self, *args, **kwargs):
263             throttle_time = self.next_request_time - time.time()
264             if throttle_time > 0:
265                 time.sleep(throttle_time)
266             self.next_request_time = time.time()
267             try:
268                 result = orig_func(self, *args, **kwargs)
269             except Exception as error:
270                 if self._cloud.is_cloud_exception(error):
271                     self.error_streak += 1
272                     self.next_request_time += min(2 ** self.error_streak,
273                                                   self.max_retry_wait)
274                 self._logger.warn(
275                     "Unhandled exception: %s", error, exc_info=error)
276             else:
277                 self.error_streak = 0
278                 return result
279         return throttle_wrapper
280
281     @_throttle_errors
282     def sync_node(self, cloud_node, arvados_node):
283         return self._cloud.sync_node(cloud_node, arvados_node)
284
285
286 class ComputeNodeMonitorActor(config.actor_class):
287     """Actor to manage a running compute node.
288
289     This actor gets updates about a compute node's cloud and Arvados records.
290     It uses this information to notify subscribers when the node is eligible
291     for shutdown.
292     """
293     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
294                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
295                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
296                  boot_fail_after=1800
297     ):
298         super(ComputeNodeMonitorActor, self).__init__()
299         self._later = self.actor_ref.tell_proxy()
300         self._last_log = None
301         self._shutdowns = shutdown_timer
302         self._cloud_node_fqdn = cloud_fqdn_func
303         self._timer = timer_actor
304         self._update = update_actor
305         self._cloud = cloud_client
306         self.cloud_node = cloud_node
307         self.cloud_node_start_time = cloud_node_start_time
308         self.poll_stale_after = poll_stale_after
309         self.node_stale_after = node_stale_after
310         self.boot_fail_after = boot_fail_after
311         self.subscribers = set()
312         self.arvados_node = None
313         self._later.update_arvados_node(arvados_node)
314         self.last_shutdown_opening = None
315         self._later.consider_shutdown()
316
317     def _set_logger(self):
318         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
319
320     def on_start(self):
321         self._set_logger()
322         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
323
324     def subscribe(self, subscriber):
325         self.subscribers.add(subscriber)
326
327     def _debug(self, msg, *args):
328         if msg == self._last_log:
329             return
330         self._last_log = msg
331         self._logger.debug(msg, *args)
332
333     def in_state(self, *states):
334         # Return a boolean to say whether or not our Arvados node record is in
335         # one of the given states.  If state information is not
336         # available--because this node has no Arvados record, the record is
337         # stale, or the record has no state information--return None.
338         if (self.arvados_node is None) or not timestamp_fresh(
339               arvados_node_mtime(self.arvados_node), self.node_stale_after):
340             return None
341         state = self.arvados_node['crunch_worker_state']
342         if not state:
343             return None
344
345         # There's a window between when a node pings for the first time and the
346         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
347         # window, the node will still report as 'down'.  Check that
348         # first_ping_at is truthy and consider the node 'idle' during the
349         # initial boot grace period.
350         if (state == 'down' and
351             self.arvados_node['first_ping_at'] and
352             timestamp_fresh(self.cloud_node_start_time,
353                             self.boot_fail_after)):
354             state = 'idle'
355
356         # "missing" means last_ping_at is stale, this should be
357         # considered "down"
358         if arvados_node_missing(self.arvados_node, self.node_stale_after):
359             state = 'down'
360
361         result = state in states
362         if state == 'idle':
363             result = result and not self.arvados_node['job_uuid']
364         return result
365
366     def shutdown_eligible(self):
367         """Determine if node is candidate for shut down.
368
369         Returns a tuple of (boolean, string) where the first value is whether
370         the node is candidate for shut down, and the second value is the
371         reason for the decision.
372         """
373
374         # Collect states and then consult state transition table whether we
375         # should shut down.  Possible states are:
376         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
377         # window = ["open", "closed"]
378         # boot_grace = ["boot wait", "boot exceeded"]
379         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
380
381         if self.arvados_node is None:
382             crunch_worker_state = 'unpaired'
383         elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
384             return (False, "node state is stale")
385         elif self.in_state('down'):
386             crunch_worker_state = 'down'
387         elif self.in_state('idle'):
388             crunch_worker_state = 'idle'
389         elif self.in_state('busy'):
390             crunch_worker_state = 'busy'
391         else:
392             return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
393
394         window = "open" if self._shutdowns.window_open() else "closed"
395
396         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
397             boot_grace = "boot wait"
398         else:
399             boot_grace = "boot exceeded"
400
401         # API server side not implemented yet.
402         idle_grace = 'idle exceeded'
403
404         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
405         t = transitions[node_state]
406         if t is not None:
407             # yes, shutdown eligible
408             return (True, "node state is %s" % (node_state,))
409         else:
410             # no, return a reason
411             return (False, "node state is %s" % (node_state,))
412
413     def consider_shutdown(self):
414         try:
415             eligible, reason = self.shutdown_eligible()
416             next_opening = self._shutdowns.next_opening()
417             if eligible:
418                 self._debug("Suggesting shutdown because %s", reason)
419                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
420             else:
421                 self._debug("Not eligible for shut down because %s", reason)
422
423                 if self.last_shutdown_opening != next_opening:
424                     self._debug("Shutdown window closed.  Next at %s.",
425                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
426                     self._timer.schedule(next_opening, self._later.consider_shutdown)
427                     self.last_shutdown_opening = next_opening
428         except Exception:
429             self._logger.exception("Unexpected exception")
430
431     def offer_arvados_pair(self, arvados_node):
432         first_ping_s = arvados_node.get('first_ping_at')
433         if (self.arvados_node is not None) or (not first_ping_s):
434             return None
435         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
436               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
437             self._later.update_arvados_node(arvados_node)
438             return self.cloud_node.id
439         else:
440             return None
441
442     def update_cloud_node(self, cloud_node):
443         if cloud_node is not None:
444             self.cloud_node = cloud_node
445             self._later.consider_shutdown()
446
447     def update_arvados_node(self, arvados_node):
448         # If the cloud node's FQDN doesn't match what's in the Arvados node
449         # record, make them match.
450         # This method is a little unusual in the way it just fires off the
451         # request without checking the result or retrying errors.  That's
452         # because this update happens every time we reload the Arvados node
453         # list: if a previous sync attempt failed, we'll see that the names
454         # are out of sync and just try again.  ComputeNodeUpdateActor has
455         # the logic to throttle those effective retries when there's trouble.
456         if arvados_node is not None:
457             self.arvados_node = arvados_node
458             if (self._cloud_node_fqdn(self.cloud_node) !=
459                   arvados_node_fqdn(self.arvados_node)):
460                 self._update.sync_node(self.cloud_node, self.arvados_node)
461             self._later.consider_shutdown()