11413: Fix issues with node manager on GCE:
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
18
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20     """Base class for actors that change a compute node's state.
21
22     This base class takes care of retrying changes and notifying
23     subscribers when the change is finished.
24     """
25     def __init__(self, cloud_client, arvados_client, timer_actor,
26                  retry_wait, max_retry_wait):
27         super(ComputeNodeStateChangeBase, self).__init__()
28         RetryMixin.__init__(self, retry_wait, max_retry_wait,
29                             None, cloud_client, timer_actor)
30         self._later = self.actor_ref.tell_proxy()
31         self._arvados = arvados_client
32         self.subscribers = set()
33
34     def _set_logger(self):
35         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
36
37     def on_start(self):
38         self._set_logger()
39
40     def _finished(self):
41         if self.subscribers is None:
42             raise Exception("Actor tried to finish twice")
43         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44         self.subscribers = None
45         self._logger.info("finished")
46
47     def subscribe(self, subscriber):
48         if self.subscribers is None:
49             try:
50                 subscriber(self.actor_ref.proxy())
51             except pykka.ActorDeadError:
52                 pass
53         else:
54             self.subscribers.add(subscriber)
55
56     def _clean_arvados_node(self, arvados_node, explanation):
57         return self._arvados.nodes().update(
58             uuid=arvados_node['uuid'],
59             body={'hostname': None,
60                   'ip_address': None,
61                   'slot_number': None,
62                   'first_ping_at': None,
63                   'last_ping_at': None,
64                   'properties': {},
65                   'info': {'ec2_instance_id': None,
66                            'last_action': explanation}},
67             ).execute()
68
69     @staticmethod
70     def _finish_on_exception(orig_func):
71         @functools.wraps(orig_func)
72         def finish_wrapper(self, *args, **kwargs):
73             try:
74                 return orig_func(self, *args, **kwargs)
75             except Exception as error:
76                 self._logger.error("Actor error %s", error)
77                 self._finished()
78         return finish_wrapper
79
80
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82     """Actor to create and set up a cloud compute node.
83
84     This actor prepares an Arvados node record for a new compute node
85     (either creating one or cleaning one passed in), then boots the
86     actual compute node.  It notifies subscribers when the cloud node
87     is successfully created (the last step in the process for Node
88     Manager to handle).
89     """
90     def __init__(self, timer_actor, arvados_client, cloud_client,
91                  cloud_size, arvados_node=None,
92                  retry_wait=1, max_retry_wait=180):
93         super(ComputeNodeSetupActor, self).__init__(
94             cloud_client, arvados_client, timer_actor,
95             retry_wait, max_retry_wait)
96         self.cloud_size = cloud_size
97         self.arvados_node = None
98         self.cloud_node = None
99         if arvados_node is None:
100             self._later.create_arvados_node()
101         else:
102             self._later.prepare_arvados_node(arvados_node)
103
104     @ComputeNodeStateChangeBase._finish_on_exception
105     @RetryMixin._retry(config.ARVADOS_ERRORS)
106     def create_arvados_node(self):
107         self.arvados_node = self._arvados.nodes().create(body={}).execute()
108         self._later.create_cloud_node()
109
110     @ComputeNodeStateChangeBase._finish_on_exception
111     @RetryMixin._retry(config.ARVADOS_ERRORS)
112     def prepare_arvados_node(self, node):
113         self.arvados_node = self._clean_arvados_node(
114             node, "Prepared by Node Manager")
115         self._later.create_cloud_node()
116
117     @ComputeNodeStateChangeBase._finish_on_exception
118     @RetryMixin._retry()
119     def create_cloud_node(self):
120         self._logger.info("Sending create_node request for node size %s.",
121                           self.cloud_size.name)
122         self.cloud_node = self._cloud.create_node(self.cloud_size,
123                                                   self.arvados_node)
124
125         # The information included in the node size object we get from libcloud
126         # is inconsistent between cloud providers.  Replace libcloud NodeSize
127         # object with compatible CloudSizeWrapper object which merges the size
128         # info reported from the cloud with size information from the
129         # configuration file.
130         self.cloud_node.size = self.cloud_size
131
132         self._logger.info("Cloud node %s created.", self.cloud_node.id)
133         self._later.update_arvados_node_properties()
134
135     @ComputeNodeStateChangeBase._finish_on_exception
136     @RetryMixin._retry(config.ARVADOS_ERRORS)
137     def update_arvados_node_properties(self):
138         """Tell Arvados some details about the cloud node.
139
140         Currently we only include size/price from our request, which
141         we already knew before create_cloud_node(), but doing it here
142         gives us an opportunity to provide more detail from
143         self.cloud_node, too.
144         """
145         self.arvados_node['properties']['cloud_node'] = {
146             # Note this 'size' is the node size we asked the cloud
147             # driver to create -- not necessarily equal to the size
148             # reported by the cloud driver for the node that was
149             # created.
150             'size': self.cloud_size.id,
151             'price': self.cloud_size.price,
152         }
153         self.arvados_node = self._arvados.nodes().update(
154             uuid=self.arvados_node['uuid'],
155             body={'properties': self.arvados_node['properties']},
156         ).execute()
157         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
158         self._later.post_create()
159
160     @RetryMixin._retry()
161     def post_create(self):
162         self._cloud.post_create_node(self.cloud_node)
163         self._logger.info("%s post-create work done.", self.cloud_node.id)
164         self._finished()
165
166     def stop_if_no_cloud_node(self):
167         if self.cloud_node is not None:
168             return False
169         self.stop()
170         return True
171
172
173 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
174     """Actor to shut down a compute node.
175
176     This actor simply destroys a cloud node, retrying as needed.
177     """
178     # Reasons for a shutdown to be cancelled.
179     WINDOW_CLOSED = "shutdown window closed"
180     DESTROY_FAILED = "destroy_node failed"
181
182     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
183                  cancellable=True, retry_wait=1, max_retry_wait=180):
184         # If a ShutdownActor is cancellable, it will ask the
185         # ComputeNodeMonitorActor if it's still eligible before taking each
186         # action, and stop the shutdown process if the node is no longer
187         # eligible.  Normal shutdowns based on job demand should be
188         # cancellable; shutdowns based on node misbehavior should not.
189         super(ComputeNodeShutdownActor, self).__init__(
190             cloud_client, arvados_client, timer_actor,
191             retry_wait, max_retry_wait)
192         self._monitor = node_monitor.proxy()
193         self.cloud_node = self._monitor.cloud_node.get()
194         self.cancellable = cancellable
195         self.cancel_reason = None
196         self.success = None
197
198     def _set_logger(self):
199         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
200
201     def on_start(self):
202         super(ComputeNodeShutdownActor, self).on_start()
203         self._later.shutdown_node()
204
205     def _arvados_node(self):
206         return self._monitor.arvados_node.get()
207
208     def _finished(self, success_flag=None):
209         if success_flag is not None:
210             self.success = success_flag
211         return super(ComputeNodeShutdownActor, self)._finished()
212
213     def cancel_shutdown(self, reason, **kwargs):
214         self.cancel_reason = reason
215         self._logger.info("Shutdown cancelled: %s.", reason)
216         self._finished(success_flag=False)
217
218     def _cancel_on_exception(orig_func):
219         @functools.wraps(orig_func)
220         def finish_wrapper(self, *args, **kwargs):
221             try:
222                 return orig_func(self, *args, **kwargs)
223             except Exception as error:
224                 self._logger.error("Actor error %s", error)
225                 self._logger.debug("", exc_info=True)
226                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
227         return finish_wrapper
228
229     @_cancel_on_exception
230     def shutdown_node(self):
231         if self.cancellable:
232             self._logger.info("Checking that node is still eligible for shutdown")
233             eligible, reason = self._monitor.shutdown_eligible().get()
234             if not eligible:
235                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
236                                      try_resume=True)
237                 return
238         self._destroy_node()
239
240     def _destroy_node(self):
241         self._logger.info("Starting shutdown")
242         arv_node = self._arvados_node()
243         if self._cloud.destroy_node(self.cloud_node):
244             self._logger.info("Shutdown success")
245             if arv_node:
246                 self._later.clean_arvados_node(arv_node)
247             else:
248                 self._finished(success_flag=True)
249         else:
250             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
251
252     @ComputeNodeStateChangeBase._finish_on_exception
253     @RetryMixin._retry(config.ARVADOS_ERRORS)
254     def clean_arvados_node(self, arvados_node):
255         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
256         self._finished(success_flag=True)
257
258
259 class ComputeNodeUpdateActor(config.actor_class):
260     """Actor to dispatch one-off cloud management requests.
261
262     This actor receives requests for small cloud updates, and
263     dispatches them to a real driver.  ComputeNodeMonitorActors use
264     this to perform maintenance tasks on themselves.  Having a
265     dedicated actor for this gives us the opportunity to control the
266     flow of requests; e.g., by backing off when errors occur.
267     """
268     def __init__(self, cloud_factory, max_retry_wait=180):
269         super(ComputeNodeUpdateActor, self).__init__()
270         self._cloud = cloud_factory()
271         self.max_retry_wait = max_retry_wait
272         self.error_streak = 0
273         self.next_request_time = time.time()
274
275     def _set_logger(self):
276         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
277
278     def on_start(self):
279         self._set_logger()
280
281     def _throttle_errors(orig_func):
282         @functools.wraps(orig_func)
283         def throttle_wrapper(self, *args, **kwargs):
284             throttle_time = self.next_request_time - time.time()
285             if throttle_time > 0:
286                 time.sleep(throttle_time)
287             self.next_request_time = time.time()
288             try:
289                 result = orig_func(self, *args, **kwargs)
290             except Exception as error:
291                 if self._cloud.is_cloud_exception(error):
292                     self.error_streak += 1
293                     self.next_request_time += min(2 ** self.error_streak,
294                                                   self.max_retry_wait)
295                 self._logger.warn(
296                     "Unhandled exception: %s", error, exc_info=error)
297             else:
298                 self.error_streak = 0
299                 return result
300         return throttle_wrapper
301
302     @_throttle_errors
303     def sync_node(self, cloud_node, arvados_node):
304         return self._cloud.sync_node(cloud_node, arvados_node)
305
306
307 class ComputeNodeMonitorActor(config.actor_class):
308     """Actor to manage a running compute node.
309
310     This actor gets updates about a compute node's cloud and Arvados records.
311     It uses this information to notify subscribers when the node is eligible
312     for shutdown.
313     """
314     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
315                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
316                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
317                  boot_fail_after=1800
318     ):
319         super(ComputeNodeMonitorActor, self).__init__()
320         self._later = self.actor_ref.tell_proxy()
321         self._shutdowns = shutdown_timer
322         self._cloud_node_fqdn = cloud_fqdn_func
323         self._timer = timer_actor
324         self._update = update_actor
325         self._cloud = cloud_client
326         self.cloud_node = cloud_node
327         self.cloud_node_start_time = cloud_node_start_time
328         self.poll_stale_after = poll_stale_after
329         self.node_stale_after = node_stale_after
330         self.boot_fail_after = boot_fail_after
331         self.subscribers = set()
332         self.arvados_node = None
333         self._later.update_arvados_node(arvados_node)
334         self.last_shutdown_opening = None
335         self._later.consider_shutdown()
336
337     def _set_logger(self):
338         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
339
340     def on_start(self):
341         self._set_logger()
342         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
343
344     def subscribe(self, subscriber):
345         self.subscribers.add(subscriber)
346
347     def _debug(self, msg, *args):
348         self._logger.debug(msg, *args)
349
350     def get_state(self):
351         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
352
353         # If this node is not associated with an Arvados node, return 'unpaired'.
354         if self.arvados_node is None:
355             return 'unpaired'
356
357         state = self.arvados_node['crunch_worker_state']
358
359         # If state information is not available because it is missing or the
360         # record is stale, return 'down'.
361         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
362                                             self.node_stale_after):
363             state = 'down'
364
365         # There's a window between when a node pings for the first time and the
366         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
367         # window, the node will still report as 'down'.  Check that
368         # first_ping_at is truthy and consider the node 'idle' during the
369         # initial boot grace period.
370         if (state == 'down' and
371             self.arvados_node['first_ping_at'] and
372             timestamp_fresh(self.cloud_node_start_time,
373                             self.boot_fail_after) and
374             not self._cloud.broken(self.cloud_node)):
375             state = 'idle'
376
377         # "missing" means last_ping_at is stale, this should be
378         # considered "down"
379         if arvados_node_missing(self.arvados_node, self.node_stale_after):
380             state = 'down'
381
382         # Turns out using 'job_uuid' this way is a bad idea.  The node record
383         # is assigned the job_uuid before the job is locked (which removes it
384         # from the queue) which means the job will be double-counted as both in
385         # the wishlist and but also keeping a node busy.  This end result is
386         # excess nodes being booted.
387         #if state == 'idle' and self.arvados_node['job_uuid']:
388         #    state = 'busy'
389
390         return state
391
392     def in_state(self, *states):
393         return self.get_state() in states
394
395     def shutdown_eligible(self):
396         """Determine if node is candidate for shut down.
397
398         Returns a tuple of (boolean, string) where the first value is whether
399         the node is candidate for shut down, and the second value is the
400         reason for the decision.
401         """
402
403         # Collect states and then consult state transition table whether we
404         # should shut down.  Possible states are:
405         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
406         # window = ["open", "closed"]
407         # boot_grace = ["boot wait", "boot exceeded"]
408         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
409
410         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
411             return (False, "node state is stale")
412
413         crunch_worker_state = self.get_state()
414
415         window = "open" if self._shutdowns.window_open() else "closed"
416
417         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
418             boot_grace = "boot wait"
419         else:
420             boot_grace = "boot exceeded"
421
422         # API server side not implemented yet.
423         idle_grace = 'idle exceeded'
424
425         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
426         t = transitions[node_state]
427         if t is not None:
428             # yes, shutdown eligible
429             return (True, "node state is %s" % (node_state,))
430         else:
431             # no, return a reason
432             return (False, "node state is %s" % (node_state,))
433
434     def consider_shutdown(self):
435         try:
436             eligible, reason = self.shutdown_eligible()
437             next_opening = self._shutdowns.next_opening()
438             if eligible:
439                 self._debug("Suggesting shutdown because %s", reason)
440                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
441             else:
442                 self._debug("Not eligible for shut down because %s", reason)
443
444                 if self.last_shutdown_opening != next_opening:
445                     self._debug("Shutdown window closed.  Next at %s.",
446                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
447                     self._timer.schedule(next_opening, self._later.consider_shutdown)
448                     self.last_shutdown_opening = next_opening
449         except Exception:
450             self._logger.exception("Unexpected exception")
451
452     def offer_arvados_pair(self, arvados_node):
453         first_ping_s = arvados_node.get('first_ping_at')
454         if (self.arvados_node is not None) or (not first_ping_s):
455             return None
456         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
457               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
458             self._later.update_arvados_node(arvados_node)
459             return self.cloud_node.id
460         else:
461             return None
462
463     def update_cloud_node(self, cloud_node):
464         if cloud_node is not None:
465             self.cloud_node = cloud_node
466             self._later.consider_shutdown()
467
468     def update_arvados_node(self, arvados_node):
469         # If the cloud node's FQDN doesn't match what's in the Arvados node
470         # record, make them match.
471         # This method is a little unusual in the way it just fires off the
472         # request without checking the result or retrying errors.  That's
473         # because this update happens every time we reload the Arvados node
474         # list: if a previous sync attempt failed, we'll see that the names
475         # are out of sync and just try again.  ComputeNodeUpdateActor has
476         # the logic to throttle those effective retries when there's trouble.
477         if arvados_node is not None:
478             self.arvados_node = arvados_node
479             if (self._cloud_node_fqdn(self.cloud_node) !=
480                   arvados_node_fqdn(self.arvados_node)):
481                 self._update.sync_node(self.cloud_node, self.arvados_node)
482             self._later.consider_shutdown()