15964: Remove qr1hi from a few more places. Delete unused includes.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11 import re
12
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
15
16 import pykka
17
18 from .. import \
19     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20     arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from ... import status
24 from .transitions import transitions
25
26 QuotaExceeded = "QuotaExceeded"
27
28 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
29     """Base class for actors that change a compute node's state.
30
31     This base class takes care of retrying changes and notifying
32     subscribers when the change is finished.
33     """
34     def __init__(self, cloud_client, arvados_client, timer_actor,
35                  retry_wait, max_retry_wait):
36         super(ComputeNodeStateChangeBase, self).__init__()
37         RetryMixin.__init__(self, retry_wait, max_retry_wait,
38                             None, cloud_client, timer_actor)
39         self._later = self.actor_ref.tell_proxy()
40         self._arvados = arvados_client
41         self.subscribers = set()
42
43     def _set_logger(self):
44         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
45
46     def on_start(self):
47         self._set_logger()
48
49     def _finished(self):
50         if self.subscribers is None:
51             raise Exception("Actor tried to finish twice")
52         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
53         self.subscribers = None
54         self._logger.info("finished")
55
56     def subscribe(self, subscriber):
57         if self.subscribers is None:
58             try:
59                 subscriber(self.actor_ref.proxy())
60             except pykka.ActorDeadError:
61                 pass
62         else:
63             self.subscribers.add(subscriber)
64
65     def _clean_arvados_node(self, arvados_node, explanation):
66         return self._arvados.nodes().update(
67             uuid=arvados_node['uuid'],
68             body={'hostname': None,
69                   'ip_address': None,
70                   'slot_number': None,
71                   'first_ping_at': None,
72                   'last_ping_at': None,
73                   'properties': {},
74                   'info': {'ec2_instance_id': None,
75                            'last_action': explanation}},
76             ).execute()
77
78     @staticmethod
79     def _finish_on_exception(orig_func):
80         @functools.wraps(orig_func)
81         def finish_wrapper(self, *args, **kwargs):
82             try:
83                 return orig_func(self, *args, **kwargs)
84             except Exception as error:
85                 self._logger.error("Actor error %s", error)
86                 self._finished()
87         return finish_wrapper
88
89
90 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
91     """Actor to create and set up a cloud compute node.
92
93     This actor prepares an Arvados node record for a new compute node
94     (either creating one or cleaning one passed in), then boots the
95     actual compute node.  It notifies subscribers when the cloud node
96     is successfully created (the last step in the process for Node
97     Manager to handle).
98     """
99     def __init__(self, timer_actor, arvados_client, cloud_client,
100                  cloud_size, arvados_node=None,
101                  retry_wait=1, max_retry_wait=180):
102         super(ComputeNodeSetupActor, self).__init__(
103             cloud_client, arvados_client, timer_actor,
104             retry_wait, max_retry_wait)
105         self.cloud_size = cloud_size
106         self.arvados_node = None
107         self.cloud_node = None
108         self.error = None
109         if arvados_node is None:
110             self._later.create_arvados_node()
111         else:
112             self._later.prepare_arvados_node(arvados_node)
113
114     @ComputeNodeStateChangeBase._finish_on_exception
115     @RetryMixin._retry(config.ARVADOS_ERRORS)
116     def create_arvados_node(self):
117         self.arvados_node = self._arvados.nodes().create(
118             body={}, assign_slot=True).execute()
119         self._later.create_cloud_node()
120
121     @ComputeNodeStateChangeBase._finish_on_exception
122     @RetryMixin._retry(config.ARVADOS_ERRORS)
123     def prepare_arvados_node(self, node):
124         self._clean_arvados_node(node, "Prepared by Node Manager")
125         self.arvados_node = self._arvados.nodes().update(
126             uuid=node['uuid'], body={}, assign_slot=True).execute()
127         self._later.create_cloud_node()
128
129     @ComputeNodeStateChangeBase._finish_on_exception
130     @RetryMixin._retry()
131     def create_cloud_node(self):
132         self._logger.info("Sending create_node request for node size %s.",
133                           self.cloud_size.id)
134         try:
135             self.cloud_node = self._cloud.create_node(self.cloud_size,
136                                                       self.arvados_node)
137         except BaseHTTPError as e:
138             if e.code == 429 or "RequestLimitExceeded" in e.message:
139                 # Don't consider API rate limits to be quota errors.
140                 # re-raise so the Retry logic applies.
141                 raise
142
143             # The set of possible error codes / messages isn't documented for
144             # all clouds, so use a keyword heuristic to determine if the
145             # failure is likely due to a quota.
146             if re.search(r'(exceed|quota|limit)', e.message, re.I):
147                 self.error = QuotaExceeded
148                 self._logger.warning("Quota exceeded: %s", e)
149                 self._finished()
150                 return
151             else:
152                 # Something else happened, re-raise so the Retry logic applies.
153                 raise
154         except Exception as e:
155             raise
156
157         # The information included in the node size object we get from libcloud
158         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
159         # object with compatible CloudSizeWrapper object which merges the size
160         # info reported from the cloud with size information from the
161         # configuration file.
162         self.cloud_node.size = self.cloud_size
163
164         self._logger.info("Cloud node %s created.", self.cloud_node.id)
165         self._later.update_arvados_node_properties()
166
167     @ComputeNodeStateChangeBase._finish_on_exception
168     @RetryMixin._retry(config.ARVADOS_ERRORS)
169     def update_arvados_node_properties(self):
170         """Tell Arvados some details about the cloud node.
171
172         Currently we only include size/price from our request, which
173         we already knew before create_cloud_node(), but doing it here
174         gives us an opportunity to provide more detail from
175         self.cloud_node, too.
176         """
177         self.arvados_node['properties']['cloud_node'] = {
178             # Note this 'size' is the node size we asked the cloud
179             # driver to create -- not necessarily equal to the size
180             # reported by the cloud driver for the node that was
181             # created.
182             'size': self.cloud_size.id,
183             'price': self.cloud_size.price,
184         }
185         self.arvados_node = self._arvados.nodes().update(
186             uuid=self.arvados_node['uuid'],
187             body={'properties': self.arvados_node['properties']},
188         ).execute()
189         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
190         self._later.post_create()
191
192     @RetryMixin._retry()
193     def post_create(self):
194         self._cloud.post_create_node(self.cloud_node)
195         self._logger.info("%s post-create work done.", self.cloud_node.id)
196         self._finished()
197
198     def stop_if_no_cloud_node(self):
199         if self.cloud_node is not None:
200             return False
201         self.stop()
202         return True
203
204
205 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
206     """Actor to shut down a compute node.
207
208     This actor simply destroys a cloud node, retrying as needed.
209     """
210     # Reasons for a shutdown to be cancelled.
211     WINDOW_CLOSED = "shutdown window closed"
212     DESTROY_FAILED = "destroy_node failed"
213
214     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
215                  cancellable=True, retry_wait=1, max_retry_wait=180):
216         # If a ShutdownActor is cancellable, it will ask the
217         # ComputeNodeMonitorActor if it's still eligible before taking each
218         # action, and stop the shutdown process if the node is no longer
219         # eligible.  Normal shutdowns based on job demand should be
220         # cancellable; shutdowns based on node misbehavior should not.
221         super(ComputeNodeShutdownActor, self).__init__(
222             cloud_client, arvados_client, timer_actor,
223             retry_wait, max_retry_wait)
224         self._monitor = node_monitor.proxy()
225         self.cloud_node = self._monitor.cloud_node.get()
226         self.cancellable = cancellable
227         self.cancel_reason = None
228         self.success = None
229
230     def _set_logger(self):
231         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
232
233     def on_start(self):
234         super(ComputeNodeShutdownActor, self).on_start()
235         self._later.shutdown_node()
236
237     def _arvados_node(self):
238         return self._monitor.arvados_node.get()
239
240     def _finished(self, success_flag=None):
241         if success_flag is not None:
242             self.success = success_flag
243         return super(ComputeNodeShutdownActor, self)._finished()
244
245     def cancel_shutdown(self, reason, **kwargs):
246         if not self.cancellable:
247             return False
248         if self.cancel_reason is not None:
249             # already cancelled
250             return False
251         self.cancel_reason = reason
252         self._logger.info("Shutdown cancelled: %s.", reason)
253         self._finished(success_flag=False)
254         return True
255
256     def _cancel_on_exception(orig_func):
257         @functools.wraps(orig_func)
258         def finish_wrapper(self, *args, **kwargs):
259             try:
260                 return orig_func(self, *args, **kwargs)
261             except Exception as error:
262                 self._logger.error("Actor error %s", error)
263                 self._logger.debug("", exc_info=True)
264                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
265         return finish_wrapper
266
267     @_cancel_on_exception
268     def shutdown_node(self):
269         if self.cancel_reason is not None:
270             # already cancelled
271             return
272         if self.cancellable:
273             self._logger.info("Checking that node is still eligible for shutdown")
274             eligible, reason = self._monitor.shutdown_eligible().get()
275             if not eligible:
276                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
277                                      try_resume=True)
278                 return
279         # If boot failed, count the event
280         if self._monitor.get_state().get() == 'unpaired':
281             status.tracker.counter_add('boot_failures')
282         self._destroy_node()
283
284     def _destroy_node(self):
285         self._logger.info("Starting shutdown")
286         arv_node = self._arvados_node()
287         if self._cloud.destroy_node(self.cloud_node):
288             self.cancellable = False
289             self._logger.info("Shutdown success")
290             if arv_node:
291                 self._later.clean_arvados_node(arv_node)
292             else:
293                 self._finished(success_flag=True)
294         else:
295             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
296
297     @ComputeNodeStateChangeBase._finish_on_exception
298     @RetryMixin._retry(config.ARVADOS_ERRORS)
299     def clean_arvados_node(self, arvados_node):
300         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
301         self._finished(success_flag=True)
302
303
304 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
305     """Actor to dispatch one-off cloud management requests.
306
307     This actor receives requests for small cloud updates, and
308     dispatches them to a real driver.  ComputeNodeMonitorActors use
309     this to perform maintenance tasks on themselves.  Having a
310     dedicated actor for this gives us the opportunity to control the
311     flow of requests; e.g., by backing off when errors occur.
312     """
313     def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
314         super(ComputeNodeUpdateActor, self).__init__()
315         RetryMixin.__init__(self, 1, max_retry_wait,
316                             None, cloud_factory(), timer_actor)
317         self._cloud = cloud_factory()
318         self._later = self.actor_ref.tell_proxy()
319
320     def _set_logger(self):
321         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
322
323     def on_start(self):
324         self._set_logger()
325
326     @RetryMixin._retry()
327     def sync_node(self, cloud_node, arvados_node):
328         if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
329             return self._cloud.sync_node(cloud_node, arvados_node)
330
331
332 class ComputeNodeMonitorActor(config.actor_class):
333     """Actor to manage a running compute node.
334
335     This actor gets updates about a compute node's cloud and Arvados records.
336     It uses this information to notify subscribers when the node is eligible
337     for shutdown.
338     """
339     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
340                  timer_actor, update_actor, cloud_client,
341                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
342                  boot_fail_after=1800, consecutive_idle_count=0
343     ):
344         super(ComputeNodeMonitorActor, self).__init__()
345         self._later = self.actor_ref.tell_proxy()
346         self._shutdowns = shutdown_timer
347         self._timer = timer_actor
348         self._update = update_actor
349         self._cloud = cloud_client
350         self.cloud_node = cloud_node
351         self.cloud_node_start_time = cloud_node_start_time
352         self.poll_stale_after = poll_stale_after
353         self.node_stale_after = node_stale_after
354         self.boot_fail_after = boot_fail_after
355         self.subscribers = set()
356         self.arvados_node = None
357         self.consecutive_idle_count = consecutive_idle_count
358         self.consecutive_idle = 0
359         self._later.update_arvados_node(arvados_node)
360         self.last_shutdown_opening = None
361         self._later.consider_shutdown()
362
363     def _set_logger(self):
364         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
365
366     def on_start(self):
367         self._set_logger()
368         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
369
370     def subscribe(self, subscriber):
371         self.subscribers.add(subscriber)
372
373     def _debug(self, msg, *args):
374         self._logger.debug(msg, *args)
375
376     def get_state(self):
377         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
378
379         # If this node is not associated with an Arvados node, return
380         # 'unpaired' if we're in the boot grace period, and 'down' if not,
381         # so it isn't counted towards usable nodes.
382         if self.arvados_node is None:
383             if timestamp_fresh(self.cloud_node_start_time,
384                                self.boot_fail_after):
385                 return 'unpaired'
386             else:
387                 return 'down'
388
389         state = self.arvados_node['crunch_worker_state']
390
391         # If state information is not available because it is missing or the
392         # record is stale, return 'down'.
393         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
394                                             self.node_stale_after):
395             state = 'down'
396
397         # There's a window between when a node pings for the first time and the
398         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
399         # window, the node will still report as 'down'.  Check that
400         # first_ping_at is truthy and consider the node 'idle' during the
401         # initial boot grace period.
402         if (state == 'down' and
403             self.arvados_node['first_ping_at'] and
404             timestamp_fresh(self.cloud_node_start_time,
405                             self.boot_fail_after) and
406             not self._cloud.broken(self.cloud_node)):
407             state = 'idle'
408
409         # "missing" means last_ping_at is stale, this should be
410         # considered "down"
411         if arvados_node_missing(self.arvados_node, self.node_stale_after):
412             state = 'down'
413
414         # Turns out using 'job_uuid' this way is a bad idea.  The node record
415         # is assigned the job_uuid before the job is locked (which removes it
416         # from the queue) which means the job will be double-counted as both in
417         # the wishlist and but also keeping a node busy.  This end result is
418         # excess nodes being booted.
419         #if state == 'idle' and self.arvados_node['job_uuid']:
420         #    state = 'busy'
421
422         # Update idle node times tracker
423         if state == 'idle':
424             status.tracker.idle_in(self.arvados_node['hostname'])
425         else:
426             status.tracker.idle_out(self.arvados_node['hostname'])
427
428         return state
429
430     def in_state(self, *states):
431         return self.get_state() in states
432
433     def shutdown_eligible(self):
434         """Determine if node is candidate for shut down.
435
436         Returns a tuple of (boolean, string) where the first value is whether
437         the node is candidate for shut down, and the second value is the
438         reason for the decision.
439         """
440
441         # If this node's size is invalid (because it has a stale arvados_node_size
442         # tag), return True so that it's properly shut down.
443         if self.cloud_node.size.id == 'invalid':
444             return (True, "node's size tag '%s' not recognizable" % (self.cloud_node.extra['arvados_node_size'],))
445
446         # Collect states and then consult state transition table whether we
447         # should shut down.  Possible states are:
448         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
449         # window = ["open", "closed"]
450         # boot_grace = ["boot wait", "boot exceeded"]
451         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
452
453         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
454             return (False, "node state is stale")
455
456         crunch_worker_state = self.get_state()
457
458         window = "open" if self._shutdowns.window_open() else "closed"
459
460         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
461             boot_grace = "boot wait"
462         else:
463             boot_grace = "boot exceeded"
464
465         if crunch_worker_state == "idle":
466             # Must report as "idle" at least "consecutive_idle_count" times
467             if self.consecutive_idle < self.consecutive_idle_count:
468                 idle_grace = 'idle wait'
469             else:
470                 idle_grace = 'idle exceeded'
471         else:
472             idle_grace = 'not idle'
473
474         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
475         t = transitions[node_state]
476         if t is not None:
477             # yes, shutdown eligible
478             return (True, "node state is %s" % (node_state,))
479         else:
480             # no, return a reason
481             return (False, "node state is %s" % (node_state,))
482
483     def consider_shutdown(self):
484         try:
485             eligible, reason = self.shutdown_eligible()
486             next_opening = self._shutdowns.next_opening()
487             if eligible:
488                 self._debug("Suggesting shutdown because %s", reason)
489                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
490             else:
491                 self._debug("Not eligible for shut down because %s", reason)
492
493                 if self.last_shutdown_opening != next_opening:
494                     self._debug("Shutdown window closed.  Next at %s.",
495                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
496                     self._timer.schedule(next_opening, self._later.consider_shutdown)
497                     self.last_shutdown_opening = next_opening
498         except Exception:
499             self._logger.exception("Unexpected exception")
500
501     def offer_arvados_pair(self, arvados_node):
502         first_ping_s = arvados_node.get('first_ping_at')
503         if (self.arvados_node is not None) or (not first_ping_s):
504             return None
505         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
506               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
507             self._later.update_arvados_node(arvados_node)
508             return self.cloud_node.id
509         else:
510             return None
511
512     def update_cloud_node(self, cloud_node):
513         if cloud_node is not None:
514             self.cloud_node = cloud_node
515             self._later.consider_shutdown()
516
517     def update_arvados_node(self, arvados_node):
518         """Called when the latest Arvados node record is retrieved.
519
520         Calls the updater's sync_node() method.
521
522         """
523         # This method is a little unusual in the way it just fires off the
524         # request without checking the result or retrying errors.  That's
525         # because this update happens every time we reload the Arvados node
526         # list: if a previous sync attempt failed, we'll see that the names
527         # are out of sync and just try again.  ComputeNodeUpdateActor has
528         # the logic to throttle those effective retries when there's trouble.
529         if arvados_node is not None:
530             self.arvados_node = arvados_node
531             self._update.sync_node(self.cloud_node, self.arvados_node)
532             if self.arvados_node['crunch_worker_state'] == "idle":
533                 self.consecutive_idle += 1
534             else:
535                 self.consecutive_idle = 0
536             self._later.consider_shutdown()