Merge branch 'wtsi/13093-crunch-dispatch-slurm-add-mem' refs #13093
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11 import re
12
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
15
16 import pykka
17
18 from .. import \
19     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20     arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from .transitions import transitions
24
25 QuotaExceeded = "QuotaExceeded"
26
27 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
28     """Base class for actors that change a compute node's state.
29
30     This base class takes care of retrying changes and notifying
31     subscribers when the change is finished.
32     """
33     def __init__(self, cloud_client, arvados_client, timer_actor,
34                  retry_wait, max_retry_wait):
35         super(ComputeNodeStateChangeBase, self).__init__()
36         RetryMixin.__init__(self, retry_wait, max_retry_wait,
37                             None, cloud_client, timer_actor)
38         self._later = self.actor_ref.tell_proxy()
39         self._arvados = arvados_client
40         self.subscribers = set()
41
42     def _set_logger(self):
43         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
44
45     def on_start(self):
46         self._set_logger()
47
48     def _finished(self):
49         if self.subscribers is None:
50             raise Exception("Actor tried to finish twice")
51         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
52         self.subscribers = None
53         self._logger.info("finished")
54
55     def subscribe(self, subscriber):
56         if self.subscribers is None:
57             try:
58                 subscriber(self.actor_ref.proxy())
59             except pykka.ActorDeadError:
60                 pass
61         else:
62             self.subscribers.add(subscriber)
63
64     def _clean_arvados_node(self, arvados_node, explanation):
65         return self._arvados.nodes().update(
66             uuid=arvados_node['uuid'],
67             body={'hostname': None,
68                   'ip_address': None,
69                   'slot_number': None,
70                   'first_ping_at': None,
71                   'last_ping_at': None,
72                   'properties': {},
73                   'info': {'ec2_instance_id': None,
74                            'last_action': explanation}},
75             ).execute()
76
77     @staticmethod
78     def _finish_on_exception(orig_func):
79         @functools.wraps(orig_func)
80         def finish_wrapper(self, *args, **kwargs):
81             try:
82                 return orig_func(self, *args, **kwargs)
83             except Exception as error:
84                 self._logger.error("Actor error %s", error)
85                 self._finished()
86         return finish_wrapper
87
88
89 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
90     """Actor to create and set up a cloud compute node.
91
92     This actor prepares an Arvados node record for a new compute node
93     (either creating one or cleaning one passed in), then boots the
94     actual compute node.  It notifies subscribers when the cloud node
95     is successfully created (the last step in the process for Node
96     Manager to handle).
97     """
98     def __init__(self, timer_actor, arvados_client, cloud_client,
99                  cloud_size, arvados_node=None,
100                  retry_wait=1, max_retry_wait=180):
101         super(ComputeNodeSetupActor, self).__init__(
102             cloud_client, arvados_client, timer_actor,
103             retry_wait, max_retry_wait)
104         self.cloud_size = cloud_size
105         self.arvados_node = None
106         self.cloud_node = None
107         self.error = None
108         if arvados_node is None:
109             self._later.create_arvados_node()
110         else:
111             self._later.prepare_arvados_node(arvados_node)
112
113     @ComputeNodeStateChangeBase._finish_on_exception
114     @RetryMixin._retry(config.ARVADOS_ERRORS)
115     def create_arvados_node(self):
116         self.arvados_node = self._arvados.nodes().create(
117             body={}, assign_slot=True).execute()
118         self._later.create_cloud_node()
119
120     @ComputeNodeStateChangeBase._finish_on_exception
121     @RetryMixin._retry(config.ARVADOS_ERRORS)
122     def prepare_arvados_node(self, node):
123         self._clean_arvados_node(node, "Prepared by Node Manager")
124         self.arvados_node = self._arvados.nodes().update(
125             uuid=node['uuid'], body={}, assign_slot=True).execute()
126         self._later.create_cloud_node()
127
128     @ComputeNodeStateChangeBase._finish_on_exception
129     @RetryMixin._retry()
130     def create_cloud_node(self):
131         self._logger.info("Sending create_node request for node size %s.",
132                           self.cloud_size.name)
133         try:
134             self.cloud_node = self._cloud.create_node(self.cloud_size,
135                                                       self.arvados_node)
136         except BaseHTTPError as e:
137             if e.code == 429 or "RequestLimitExceeded" in e.message:
138                 # Don't consider API rate limits to be quota errors.
139                 # re-raise so the Retry logic applies.
140                 raise
141
142             # The set of possible error codes / messages isn't documented for
143             # all clouds, so use a keyword heuristic to determine if the
144             # failure is likely due to a quota.
145             if re.search(r'(exceed|quota|limit)', e.message, re.I):
146                 self.error = QuotaExceeded
147                 self._logger.warning("Quota exceeded: %s", e)
148                 self._finished()
149                 return
150             else:
151                 # Something else happened, re-raise so the Retry logic applies.
152                 raise
153         except Exception as e:
154             raise
155
156         # The information included in the node size object we get from libcloud
157         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
158         # object with compatible CloudSizeWrapper object which merges the size
159         # info reported from the cloud with size information from the
160         # configuration file.
161         self.cloud_node.size = self.cloud_size
162
163         self._logger.info("Cloud node %s created.", self.cloud_node.id)
164         self._later.update_arvados_node_properties()
165
166     @ComputeNodeStateChangeBase._finish_on_exception
167     @RetryMixin._retry(config.ARVADOS_ERRORS)
168     def update_arvados_node_properties(self):
169         """Tell Arvados some details about the cloud node.
170
171         Currently we only include size/price from our request, which
172         we already knew before create_cloud_node(), but doing it here
173         gives us an opportunity to provide more detail from
174         self.cloud_node, too.
175         """
176         self.arvados_node['properties']['cloud_node'] = {
177             # Note this 'size' is the node size we asked the cloud
178             # driver to create -- not necessarily equal to the size
179             # reported by the cloud driver for the node that was
180             # created.
181             'size': self.cloud_size.id,
182             'price': self.cloud_size.price,
183         }
184         self.arvados_node = self._arvados.nodes().update(
185             uuid=self.arvados_node['uuid'],
186             body={'properties': self.arvados_node['properties']},
187         ).execute()
188         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
189         self._later.post_create()
190
191     @RetryMixin._retry()
192     def post_create(self):
193         self._cloud.post_create_node(self.cloud_node)
194         self._logger.info("%s post-create work done.", self.cloud_node.id)
195         self._finished()
196
197     def stop_if_no_cloud_node(self):
198         if self.cloud_node is not None:
199             return False
200         self.stop()
201         return True
202
203
204 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
205     """Actor to shut down a compute node.
206
207     This actor simply destroys a cloud node, retrying as needed.
208     """
209     # Reasons for a shutdown to be cancelled.
210     WINDOW_CLOSED = "shutdown window closed"
211     DESTROY_FAILED = "destroy_node failed"
212
213     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
214                  cancellable=True, retry_wait=1, max_retry_wait=180):
215         # If a ShutdownActor is cancellable, it will ask the
216         # ComputeNodeMonitorActor if it's still eligible before taking each
217         # action, and stop the shutdown process if the node is no longer
218         # eligible.  Normal shutdowns based on job demand should be
219         # cancellable; shutdowns based on node misbehavior should not.
220         super(ComputeNodeShutdownActor, self).__init__(
221             cloud_client, arvados_client, timer_actor,
222             retry_wait, max_retry_wait)
223         self._monitor = node_monitor.proxy()
224         self.cloud_node = self._monitor.cloud_node.get()
225         self.cancellable = cancellable
226         self.cancel_reason = None
227         self.success = None
228
229     def _set_logger(self):
230         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
231
232     def on_start(self):
233         super(ComputeNodeShutdownActor, self).on_start()
234         self._later.shutdown_node()
235
236     def _arvados_node(self):
237         return self._monitor.arvados_node.get()
238
239     def _finished(self, success_flag=None):
240         if success_flag is not None:
241             self.success = success_flag
242         return super(ComputeNodeShutdownActor, self)._finished()
243
244     def cancel_shutdown(self, reason, **kwargs):
245         if self.cancel_reason is not None:
246             # already cancelled
247             return
248         self.cancel_reason = reason
249         self._logger.info("Shutdown cancelled: %s.", reason)
250         self._finished(success_flag=False)
251
252     def _cancel_on_exception(orig_func):
253         @functools.wraps(orig_func)
254         def finish_wrapper(self, *args, **kwargs):
255             try:
256                 return orig_func(self, *args, **kwargs)
257             except Exception as error:
258                 self._logger.error("Actor error %s", error)
259                 self._logger.debug("", exc_info=True)
260                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
261         return finish_wrapper
262
263     @_cancel_on_exception
264     def shutdown_node(self):
265         if self.cancel_reason is not None:
266             # already cancelled
267             return
268         if self.cancellable:
269             self._logger.info("Checking that node is still eligible for shutdown")
270             eligible, reason = self._monitor.shutdown_eligible().get()
271             if not eligible:
272                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
273                                      try_resume=True)
274                 return
275         self._destroy_node()
276
277     def _destroy_node(self):
278         self._logger.info("Starting shutdown")
279         arv_node = self._arvados_node()
280         if self._cloud.destroy_node(self.cloud_node):
281             self._logger.info("Shutdown success")
282             if arv_node:
283                 self._later.clean_arvados_node(arv_node)
284             else:
285                 self._finished(success_flag=True)
286         else:
287             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
288
289     @ComputeNodeStateChangeBase._finish_on_exception
290     @RetryMixin._retry(config.ARVADOS_ERRORS)
291     def clean_arvados_node(self, arvados_node):
292         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
293         self._finished(success_flag=True)
294
295
296 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
297     """Actor to dispatch one-off cloud management requests.
298
299     This actor receives requests for small cloud updates, and
300     dispatches them to a real driver.  ComputeNodeMonitorActors use
301     this to perform maintenance tasks on themselves.  Having a
302     dedicated actor for this gives us the opportunity to control the
303     flow of requests; e.g., by backing off when errors occur.
304     """
305     def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
306         super(ComputeNodeUpdateActor, self).__init__()
307         RetryMixin.__init__(self, 1, max_retry_wait,
308                             None, cloud_factory(), timer_actor)
309         self._cloud = cloud_factory()
310         self._later = self.actor_ref.tell_proxy()
311
312     def _set_logger(self):
313         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
314
315     def on_start(self):
316         self._set_logger()
317
318     @RetryMixin._retry()
319     def sync_node(self, cloud_node, arvados_node):
320         if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
321             return self._cloud.sync_node(cloud_node, arvados_node)
322
323
324 class ComputeNodeMonitorActor(config.actor_class):
325     """Actor to manage a running compute node.
326
327     This actor gets updates about a compute node's cloud and Arvados records.
328     It uses this information to notify subscribers when the node is eligible
329     for shutdown.
330     """
331     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
332                  timer_actor, update_actor, cloud_client,
333                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
334                  boot_fail_after=1800
335     ):
336         super(ComputeNodeMonitorActor, self).__init__()
337         self._later = self.actor_ref.tell_proxy()
338         self._shutdowns = shutdown_timer
339         self._timer = timer_actor
340         self._update = update_actor
341         self._cloud = cloud_client
342         self.cloud_node = cloud_node
343         self.cloud_node_start_time = cloud_node_start_time
344         self.poll_stale_after = poll_stale_after
345         self.node_stale_after = node_stale_after
346         self.boot_fail_after = boot_fail_after
347         self.subscribers = set()
348         self.arvados_node = None
349         self._later.update_arvados_node(arvados_node)
350         self.last_shutdown_opening = None
351         self._later.consider_shutdown()
352
353     def _set_logger(self):
354         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
355
356     def on_start(self):
357         self._set_logger()
358         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
359
360     def subscribe(self, subscriber):
361         self.subscribers.add(subscriber)
362
363     def _debug(self, msg, *args):
364         self._logger.debug(msg, *args)
365
366     def get_state(self):
367         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
368
369         # If this node is not associated with an Arvados node, return
370         # 'unpaired' if we're in the boot grace period, and 'down' if not,
371         # so it isn't counted towards usable nodes.
372         if self.arvados_node is None:
373             if timestamp_fresh(self.cloud_node_start_time,
374                                self.boot_fail_after):
375                 return 'unpaired'
376             else:
377                 return 'down'
378
379         state = self.arvados_node['crunch_worker_state']
380
381         # If state information is not available because it is missing or the
382         # record is stale, return 'down'.
383         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
384                                             self.node_stale_after):
385             state = 'down'
386
387         # There's a window between when a node pings for the first time and the
388         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
389         # window, the node will still report as 'down'.  Check that
390         # first_ping_at is truthy and consider the node 'idle' during the
391         # initial boot grace period.
392         if (state == 'down' and
393             self.arvados_node['first_ping_at'] and
394             timestamp_fresh(self.cloud_node_start_time,
395                             self.boot_fail_after) and
396             not self._cloud.broken(self.cloud_node)):
397             state = 'idle'
398
399         # "missing" means last_ping_at is stale, this should be
400         # considered "down"
401         if arvados_node_missing(self.arvados_node, self.node_stale_after):
402             state = 'down'
403
404         # Turns out using 'job_uuid' this way is a bad idea.  The node record
405         # is assigned the job_uuid before the job is locked (which removes it
406         # from the queue) which means the job will be double-counted as both in
407         # the wishlist and but also keeping a node busy.  This end result is
408         # excess nodes being booted.
409         #if state == 'idle' and self.arvados_node['job_uuid']:
410         #    state = 'busy'
411
412         return state
413
414     def in_state(self, *states):
415         return self.get_state() in states
416
417     def shutdown_eligible(self):
418         """Determine if node is candidate for shut down.
419
420         Returns a tuple of (boolean, string) where the first value is whether
421         the node is candidate for shut down, and the second value is the
422         reason for the decision.
423         """
424
425         # Collect states and then consult state transition table whether we
426         # should shut down.  Possible states are:
427         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
428         # window = ["open", "closed"]
429         # boot_grace = ["boot wait", "boot exceeded"]
430         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
431
432         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
433             return (False, "node state is stale")
434
435         crunch_worker_state = self.get_state()
436
437         window = "open" if self._shutdowns.window_open() else "closed"
438
439         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
440             boot_grace = "boot wait"
441         else:
442             boot_grace = "boot exceeded"
443
444         # API server side not implemented yet.
445         idle_grace = 'idle exceeded'
446
447         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
448         t = transitions[node_state]
449         if t is not None:
450             # yes, shutdown eligible
451             return (True, "node state is %s" % (node_state,))
452         else:
453             # no, return a reason
454             return (False, "node state is %s" % (node_state,))
455
456     def consider_shutdown(self):
457         try:
458             eligible, reason = self.shutdown_eligible()
459             next_opening = self._shutdowns.next_opening()
460             if eligible:
461                 self._debug("Suggesting shutdown because %s", reason)
462                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
463             else:
464                 self._debug("Not eligible for shut down because %s", reason)
465
466                 if self.last_shutdown_opening != next_opening:
467                     self._debug("Shutdown window closed.  Next at %s.",
468                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
469                     self._timer.schedule(next_opening, self._later.consider_shutdown)
470                     self.last_shutdown_opening = next_opening
471         except Exception:
472             self._logger.exception("Unexpected exception")
473
474     def offer_arvados_pair(self, arvados_node):
475         first_ping_s = arvados_node.get('first_ping_at')
476         if (self.arvados_node is not None) or (not first_ping_s):
477             return None
478         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
479               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
480             self._later.update_arvados_node(arvados_node)
481             return self.cloud_node.id
482         else:
483             return None
484
485     def update_cloud_node(self, cloud_node):
486         if cloud_node is not None:
487             self.cloud_node = cloud_node
488             self._later.consider_shutdown()
489
490     def update_arvados_node(self, arvados_node):
491         """Called when the latest Arvados node record is retrieved.
492
493         Calls the updater's sync_node() method.
494
495         """
496         # This method is a little unusual in the way it just fires off the
497         # request without checking the result or retrying errors.  That's
498         # because this update happens every time we reload the Arvados node
499         # list: if a previous sync attempt failed, we'll see that the names
500         # are out of sync and just try again.  ComputeNodeUpdateActor has
501         # the logic to throttle those effective retries when there's trouble.
502         if arvados_node is not None:
503             self.arvados_node = arvados_node
504             self._update.sync_node(self.cloud_node, self.arvados_node)
505             self._later.consider_shutdown()