10218: Merge branch 'master' into 10218-record-node-info
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
18
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20     """Base class for actors that change a compute node's state.
21
22     This base class takes care of retrying changes and notifying
23     subscribers when the change is finished.
24     """
25     def __init__(self, cloud_client, arvados_client, timer_actor,
26                  retry_wait, max_retry_wait):
27         super(ComputeNodeStateChangeBase, self).__init__()
28         RetryMixin.__init__(self, retry_wait, max_retry_wait,
29                             None, cloud_client, timer_actor)
30         self._later = self.actor_ref.tell_proxy()
31         self._arvados = arvados_client
32         self.subscribers = set()
33
34     def _set_logger(self):
35         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
36
37     def on_start(self):
38         self._set_logger()
39
40     def _finished(self):
41         if self.subscribers is None:
42             raise Exception("Actor tried to finish twice")
43         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44         self.subscribers = None
45         self._logger.info("finished")
46
47     def subscribe(self, subscriber):
48         if self.subscribers is None:
49             try:
50                 subscriber(self.actor_ref.proxy())
51             except pykka.ActorDeadError:
52                 pass
53         else:
54             self.subscribers.add(subscriber)
55
56     def _clean_arvados_node(self, arvados_node, explanation):
57         return self._arvados.nodes().update(
58             uuid=arvados_node['uuid'],
59             body={'hostname': None,
60                   'ip_address': None,
61                   'slot_number': None,
62                   'first_ping_at': None,
63                   'last_ping_at': None,
64                   'properties': {},
65                   'info': {'ec2_instance_id': None,
66                            'last_action': explanation}},
67             ).execute()
68
69     @staticmethod
70     def _finish_on_exception(orig_func):
71         @functools.wraps(orig_func)
72         def finish_wrapper(self, *args, **kwargs):
73             try:
74                 return orig_func(self, *args, **kwargs)
75             except Exception as error:
76                 self._logger.error("Actor error %s", error)
77                 self._finished()
78         return finish_wrapper
79
80
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82     """Actor to create and set up a cloud compute node.
83
84     This actor prepares an Arvados node record for a new compute node
85     (either creating one or cleaning one passed in), then boots the
86     actual compute node.  It notifies subscribers when the cloud node
87     is successfully created (the last step in the process for Node
88     Manager to handle).
89     """
90     def __init__(self, timer_actor, arvados_client, cloud_client,
91                  cloud_size, arvados_node=None,
92                  retry_wait=1, max_retry_wait=180):
93         super(ComputeNodeSetupActor, self).__init__(
94             cloud_client, arvados_client, timer_actor,
95             retry_wait, max_retry_wait)
96         self.cloud_size = cloud_size
97         self.arvados_node = None
98         self.cloud_node = None
99         if arvados_node is None:
100             self._later.create_arvados_node()
101         else:
102             self._later.prepare_arvados_node(arvados_node)
103
104     @ComputeNodeStateChangeBase._finish_on_exception
105     @RetryMixin._retry(config.ARVADOS_ERRORS)
106     def create_arvados_node(self):
107         self.arvados_node = self._arvados.nodes().create(body={}).execute()
108         self._later.create_cloud_node()
109
110     @ComputeNodeStateChangeBase._finish_on_exception
111     @RetryMixin._retry(config.ARVADOS_ERRORS)
112     def prepare_arvados_node(self, node):
113         self.arvados_node = self._clean_arvados_node(
114             node, "Prepared by Node Manager")
115         self._later.create_cloud_node()
116
117     @ComputeNodeStateChangeBase._finish_on_exception
118     @RetryMixin._retry()
119     def create_cloud_node(self):
120         self._logger.info("Sending create_node request for node size %s.",
121                           self.cloud_size.name)
122         self.cloud_node = self._cloud.create_node(self.cloud_size,
123                                                   self.arvados_node)
124         if not self.cloud_node.size:
125              self.cloud_node.size = self.cloud_size
126         self._logger.info("Cloud node %s created.", self.cloud_node.id)
127         self._later.update_arvados_node_properties()
128
129     @ComputeNodeStateChangeBase._finish_on_exception
130     @RetryMixin._retry(config.ARVADOS_ERRORS)
131     def update_arvados_node_properties(self):
132         """Tell Arvados some details about the cloud node.
133
134         Currently we only include size/price from our request, which
135         we already knew before create_cloud_node(), but doing it here
136         gives us an opportunity to provide more detail from
137         self.cloud_node, too.
138         """
139         self.arvados_node['properties']['cloud_node'] = {
140             # Note this 'size' is the node size we asked the cloud
141             # driver to create -- not necessarily equal to the size
142             # reported by the cloud driver for the node that was
143             # created.
144             'size': self.cloud_size.id,
145             'price': self.cloud_size.price,
146         }
147         self.arvados_node = self._arvados.nodes().update(
148             uuid=self.arvados_node['uuid'],
149             body={'properties': self.arvados_node['properties']},
150         ).execute()
151         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
152         self._later.post_create()
153
154     @RetryMixin._retry()
155     def post_create(self):
156         self._cloud.post_create_node(self.cloud_node)
157         self._logger.info("%s post-create work done.", self.cloud_node.id)
158         self._finished()
159
160     def stop_if_no_cloud_node(self):
161         if self.cloud_node is not None:
162             return False
163         self.stop()
164         return True
165
166
167 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
168     """Actor to shut down a compute node.
169
170     This actor simply destroys a cloud node, retrying as needed.
171     """
172     # Reasons for a shutdown to be cancelled.
173     WINDOW_CLOSED = "shutdown window closed"
174     DESTROY_FAILED = "destroy_node failed"
175
176     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
177                  cancellable=True, retry_wait=1, max_retry_wait=180):
178         # If a ShutdownActor is cancellable, it will ask the
179         # ComputeNodeMonitorActor if it's still eligible before taking each
180         # action, and stop the shutdown process if the node is no longer
181         # eligible.  Normal shutdowns based on job demand should be
182         # cancellable; shutdowns based on node misbehavior should not.
183         super(ComputeNodeShutdownActor, self).__init__(
184             cloud_client, arvados_client, timer_actor,
185             retry_wait, max_retry_wait)
186         self._monitor = node_monitor.proxy()
187         self.cloud_node = self._monitor.cloud_node.get()
188         self.cancellable = cancellable
189         self.cancel_reason = None
190         self.success = None
191
192     def _set_logger(self):
193         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
194
195     def on_start(self):
196         super(ComputeNodeShutdownActor, self).on_start()
197         self._later.shutdown_node()
198
199     def _arvados_node(self):
200         return self._monitor.arvados_node.get()
201
202     def _finished(self, success_flag=None):
203         if success_flag is not None:
204             self.success = success_flag
205         return super(ComputeNodeShutdownActor, self)._finished()
206
207     def cancel_shutdown(self, reason, **kwargs):
208         self.cancel_reason = reason
209         self._logger.info("Shutdown cancelled: %s.", reason)
210         self._finished(success_flag=False)
211
212     def _cancel_on_exception(orig_func):
213         @functools.wraps(orig_func)
214         def finish_wrapper(self, *args, **kwargs):
215             try:
216                 return orig_func(self, *args, **kwargs)
217             except Exception as error:
218                 self._logger.error("Actor error %s", error)
219                 self._logger.debug("", exc_info=True)
220                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
221         return finish_wrapper
222
223     @_cancel_on_exception
224     def shutdown_node(self):
225         if self.cancellable:
226             self._logger.info("Checking that node is still eligible for shutdown")
227             eligible, reason = self._monitor.shutdown_eligible().get()
228             if not eligible:
229                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
230                                      try_resume=True)
231                 return
232         self._destroy_node()
233
234     def _destroy_node(self):
235         self._logger.info("Starting shutdown")
236         arv_node = self._arvados_node()
237         if self._cloud.destroy_node(self.cloud_node):
238             self._logger.info("Shutdown success")
239             if arv_node:
240                 self._later.clean_arvados_node(arv_node)
241             else:
242                 self._finished(success_flag=True)
243         else:
244             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
245
246     @ComputeNodeStateChangeBase._finish_on_exception
247     @RetryMixin._retry(config.ARVADOS_ERRORS)
248     def clean_arvados_node(self, arvados_node):
249         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
250         self._finished(success_flag=True)
251
252
253 class ComputeNodeUpdateActor(config.actor_class):
254     """Actor to dispatch one-off cloud management requests.
255
256     This actor receives requests for small cloud updates, and
257     dispatches them to a real driver.  ComputeNodeMonitorActors use
258     this to perform maintenance tasks on themselves.  Having a
259     dedicated actor for this gives us the opportunity to control the
260     flow of requests; e.g., by backing off when errors occur.
261     """
262     def __init__(self, cloud_factory, max_retry_wait=180):
263         super(ComputeNodeUpdateActor, self).__init__()
264         self._cloud = cloud_factory()
265         self.max_retry_wait = max_retry_wait
266         self.error_streak = 0
267         self.next_request_time = time.time()
268
269     def _set_logger(self):
270         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
271
272     def on_start(self):
273         self._set_logger()
274
275     def _throttle_errors(orig_func):
276         @functools.wraps(orig_func)
277         def throttle_wrapper(self, *args, **kwargs):
278             throttle_time = self.next_request_time - time.time()
279             if throttle_time > 0:
280                 time.sleep(throttle_time)
281             self.next_request_time = time.time()
282             try:
283                 result = orig_func(self, *args, **kwargs)
284             except Exception as error:
285                 if self._cloud.is_cloud_exception(error):
286                     self.error_streak += 1
287                     self.next_request_time += min(2 ** self.error_streak,
288                                                   self.max_retry_wait)
289                 self._logger.warn(
290                     "Unhandled exception: %s", error, exc_info=error)
291             else:
292                 self.error_streak = 0
293                 return result
294         return throttle_wrapper
295
296     @_throttle_errors
297     def sync_node(self, cloud_node, arvados_node):
298         return self._cloud.sync_node(cloud_node, arvados_node)
299
300
301 class ComputeNodeMonitorActor(config.actor_class):
302     """Actor to manage a running compute node.
303
304     This actor gets updates about a compute node's cloud and Arvados records.
305     It uses this information to notify subscribers when the node is eligible
306     for shutdown.
307     """
308     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
309                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
310                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
311                  boot_fail_after=1800
312     ):
313         super(ComputeNodeMonitorActor, self).__init__()
314         self._later = self.actor_ref.tell_proxy()
315         self._shutdowns = shutdown_timer
316         self._cloud_node_fqdn = cloud_fqdn_func
317         self._timer = timer_actor
318         self._update = update_actor
319         self._cloud = cloud_client
320         self.cloud_node = cloud_node
321         self.cloud_node_start_time = cloud_node_start_time
322         self.poll_stale_after = poll_stale_after
323         self.node_stale_after = node_stale_after
324         self.boot_fail_after = boot_fail_after
325         self.subscribers = set()
326         self.arvados_node = None
327         self._later.update_arvados_node(arvados_node)
328         self.last_shutdown_opening = None
329         self._later.consider_shutdown()
330
331     def _set_logger(self):
332         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
333
334     def on_start(self):
335         self._set_logger()
336         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
337
338     def subscribe(self, subscriber):
339         self.subscribers.add(subscriber)
340
341     def _debug(self, msg, *args):
342         self._logger.debug(msg, *args)
343
344     def get_state(self):
345         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
346
347         # If this node is not associated with an Arvados node, return 'unpaired'.
348         if self.arvados_node is None:
349             return 'unpaired'
350
351         # This node is indicated as non-functioning by the cloud
352         if self._cloud.broken(self.cloud_node):
353             return 'down'
354
355         state = self.arvados_node['crunch_worker_state']
356
357         # If state information is not available because it is missing or the
358         # record is stale, return 'down'.
359         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
360                                             self.node_stale_after):
361             state = 'down'
362
363         # There's a window between when a node pings for the first time and the
364         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
365         # window, the node will still report as 'down'.  Check that
366         # first_ping_at is truthy and consider the node 'idle' during the
367         # initial boot grace period.
368         if (state == 'down' and
369             self.arvados_node['first_ping_at'] and
370             timestamp_fresh(self.cloud_node_start_time,
371                             self.boot_fail_after) and
372             not self._cloud.broken(self.cloud_node)):
373             state = 'idle'
374
375         # "missing" means last_ping_at is stale, this should be
376         # considered "down"
377         if arvados_node_missing(self.arvados_node, self.node_stale_after):
378             state = 'down'
379
380         # Turns out using 'job_uuid' this way is a bad idea.  The node record
381         # is assigned the job_uuid before the job is locked (which removes it
382         # from the queue) which means the job will be double-counted as both in
383         # the wishlist and but also keeping a node busy.  This end result is
384         # excess nodes being booted.
385         #if state == 'idle' and self.arvados_node['job_uuid']:
386         #    state = 'busy'
387
388         return state
389
390     def in_state(self, *states):
391         return self.get_state() in states
392
393     def shutdown_eligible(self):
394         """Determine if node is candidate for shut down.
395
396         Returns a tuple of (boolean, string) where the first value is whether
397         the node is candidate for shut down, and the second value is the
398         reason for the decision.
399         """
400
401         # Collect states and then consult state transition table whether we
402         # should shut down.  Possible states are:
403         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
404         # window = ["open", "closed"]
405         # boot_grace = ["boot wait", "boot exceeded"]
406         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
407
408         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
409             return (False, "node state is stale")
410
411         crunch_worker_state = self.get_state()
412
413         window = "open" if self._shutdowns.window_open() else "closed"
414
415         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
416             boot_grace = "boot wait"
417         else:
418             boot_grace = "boot exceeded"
419
420         # API server side not implemented yet.
421         idle_grace = 'idle exceeded'
422
423         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
424         t = transitions[node_state]
425         if t is not None:
426             # yes, shutdown eligible
427             return (True, "node state is %s" % (node_state,))
428         else:
429             # no, return a reason
430             return (False, "node state is %s" % (node_state,))
431
432     def consider_shutdown(self):
433         try:
434             eligible, reason = self.shutdown_eligible()
435             next_opening = self._shutdowns.next_opening()
436             if eligible:
437                 self._debug("Suggesting shutdown because %s", reason)
438                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
439             else:
440                 self._debug("Not eligible for shut down because %s", reason)
441
442                 if self.last_shutdown_opening != next_opening:
443                     self._debug("Shutdown window closed.  Next at %s.",
444                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
445                     self._timer.schedule(next_opening, self._later.consider_shutdown)
446                     self.last_shutdown_opening = next_opening
447         except Exception:
448             self._logger.exception("Unexpected exception")
449
450     def offer_arvados_pair(self, arvados_node):
451         first_ping_s = arvados_node.get('first_ping_at')
452         if (self.arvados_node is not None) or (not first_ping_s):
453             return None
454         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
455               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
456             self._later.update_arvados_node(arvados_node)
457             return self.cloud_node.id
458         else:
459             return None
460
461     def update_cloud_node(self, cloud_node):
462         if cloud_node is not None:
463             self.cloud_node = cloud_node
464             self._later.consider_shutdown()
465
466     def update_arvados_node(self, arvados_node):
467         # If the cloud node's FQDN doesn't match what's in the Arvados node
468         # record, make them match.
469         # This method is a little unusual in the way it just fires off the
470         # request without checking the result or retrying errors.  That's
471         # because this update happens every time we reload the Arvados node
472         # list: if a previous sync attempt failed, we'll see that the names
473         # are out of sync and just try again.  ComputeNodeUpdateActor has
474         # the logic to throttle those effective retries when there's trouble.
475         if arvados_node is not None:
476             self.arvados_node = arvados_node
477             if (self._cloud_node_fqdn(self.cloud_node) !=
478                   arvados_node_fqdn(self.arvados_node)):
479                 self._update.sync_node(self.cloud_node, self.arvados_node)
480             self._later.consider_shutdown()