12199: Update slurm features when nodes come up.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11 import re
12
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
15
16 import pykka
17
18 from .. import \
19     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20     arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from .transitions import transitions
24
25 QuotaExceeded = "QuotaExceeded"
26
27 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
28     """Base class for actors that change a compute node's state.
29
30     This base class takes care of retrying changes and notifying
31     subscribers when the change is finished.
32     """
33     def __init__(self, cloud_client, arvados_client, timer_actor,
34                  retry_wait, max_retry_wait):
35         super(ComputeNodeStateChangeBase, self).__init__()
36         RetryMixin.__init__(self, retry_wait, max_retry_wait,
37                             None, cloud_client, timer_actor)
38         self._later = self.actor_ref.tell_proxy()
39         self._arvados = arvados_client
40         self.subscribers = set()
41
42     def _set_logger(self):
43         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
44
45     def on_start(self):
46         self._set_logger()
47
48     def _finished(self):
49         if self.subscribers is None:
50             raise Exception("Actor tried to finish twice")
51         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
52         self.subscribers = None
53         self._logger.info("finished")
54
55     def subscribe(self, subscriber):
56         if self.subscribers is None:
57             try:
58                 subscriber(self.actor_ref.proxy())
59             except pykka.ActorDeadError:
60                 pass
61         else:
62             self.subscribers.add(subscriber)
63
64     def _clean_arvados_node(self, arvados_node, explanation):
65         return self._arvados.nodes().update(
66             uuid=arvados_node['uuid'],
67             body={'hostname': None,
68                   'ip_address': None,
69                   'slot_number': None,
70                   'first_ping_at': None,
71                   'last_ping_at': None,
72                   'properties': {},
73                   'info': {'ec2_instance_id': None,
74                            'last_action': explanation}},
75             ).execute()
76
77     @staticmethod
78     def _finish_on_exception(orig_func):
79         @functools.wraps(orig_func)
80         def finish_wrapper(self, *args, **kwargs):
81             try:
82                 return orig_func(self, *args, **kwargs)
83             except Exception as error:
84                 self._logger.error("Actor error %s", error)
85                 self._finished()
86         return finish_wrapper
87
88
89 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
90     """Actor to create and set up a cloud compute node.
91
92     This actor prepares an Arvados node record for a new compute node
93     (either creating one or cleaning one passed in), then boots the
94     actual compute node.  It notifies subscribers when the cloud node
95     is successfully created (the last step in the process for Node
96     Manager to handle).
97     """
98     def __init__(self, timer_actor, arvados_client, cloud_client,
99                  cloud_size, arvados_node=None,
100                  retry_wait=1, max_retry_wait=180):
101         super(ComputeNodeSetupActor, self).__init__(
102             cloud_client, arvados_client, timer_actor,
103             retry_wait, max_retry_wait)
104         self.cloud_size = cloud_size
105         self.arvados_node = None
106         self.cloud_node = None
107         self.error = None
108         if arvados_node is None:
109             self._later.create_arvados_node()
110         else:
111             self._later.prepare_arvados_node(arvados_node)
112
113     @ComputeNodeStateChangeBase._finish_on_exception
114     @RetryMixin._retry(config.ARVADOS_ERRORS)
115     def create_arvados_node(self):
116         self.arvados_node = self._arvados.nodes().create(body={}).execute()
117         self._later.create_cloud_node()
118
119     @ComputeNodeStateChangeBase._finish_on_exception
120     @RetryMixin._retry(config.ARVADOS_ERRORS)
121     def prepare_arvados_node(self, node):
122         self.arvados_node = self._clean_arvados_node(
123             node, "Prepared by Node Manager")
124         self._later.create_cloud_node()
125
126     @ComputeNodeStateChangeBase._finish_on_exception
127     @RetryMixin._retry()
128     def create_cloud_node(self):
129         self._logger.info("Sending create_node request for node size %s.",
130                           self.cloud_size.name)
131         try:
132             self.cloud_node = self._cloud.create_node(self.cloud_size,
133                                                       self.arvados_node)
134         except BaseHTTPError as e:
135             if e.code == 429 or "RequestLimitExceeded" in e.message:
136                 # Don't consider API rate limits to be quota errors.
137                 # re-raise so the Retry logic applies.
138                 raise
139
140             # The set of possible error codes / messages isn't documented for
141             # all clouds, so use a keyword heuristic to determine if the
142             # failure is likely due to a quota.
143             if re.search(r'(exceed|quota|limit)', e.message, re.I):
144                 self.error = QuotaExceeded
145                 self._logger.warning("Quota exceeded: %s", e)
146                 self._finished()
147                 return
148             else:
149                 # Something else happened, re-raise so the Retry logic applies.
150                 raise
151         except Exception as e:
152             raise
153
154         # The information included in the node size object we get from libcloud
155         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
156         # object with compatible CloudSizeWrapper object which merges the size
157         # info reported from the cloud with size information from the
158         # configuration file.
159         self.cloud_node.size = self.cloud_size
160
161         self._logger.info("Cloud node %s created.", self.cloud_node.id)
162         self._later.update_arvados_node_properties()
163
164     @ComputeNodeStateChangeBase._finish_on_exception
165     @RetryMixin._retry(config.ARVADOS_ERRORS)
166     def update_arvados_node_properties(self):
167         """Tell Arvados some details about the cloud node.
168
169         Currently we only include size/price from our request, which
170         we already knew before create_cloud_node(), but doing it here
171         gives us an opportunity to provide more detail from
172         self.cloud_node, too.
173         """
174         self.arvados_node['properties']['cloud_node'] = {
175             # Note this 'size' is the node size we asked the cloud
176             # driver to create -- not necessarily equal to the size
177             # reported by the cloud driver for the node that was
178             # created.
179             'size': self.cloud_size.id,
180             'price': self.cloud_size.price,
181         }
182         self.arvados_node = self._arvados.nodes().update(
183             uuid=self.arvados_node['uuid'],
184             body={'properties': self.arvados_node['properties']},
185         ).execute()
186         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
187         self._later.post_create()
188
189     @RetryMixin._retry()
190     def post_create(self):
191         self._cloud.post_create_node(self.cloud_node)
192         self._logger.info("%s post-create work done.", self.cloud_node.id)
193         self._finished()
194
195     def stop_if_no_cloud_node(self):
196         if self.cloud_node is not None:
197             return False
198         self.stop()
199         return True
200
201
202 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
203     """Actor to shut down a compute node.
204
205     This actor simply destroys a cloud node, retrying as needed.
206     """
207     # Reasons for a shutdown to be cancelled.
208     WINDOW_CLOSED = "shutdown window closed"
209     DESTROY_FAILED = "destroy_node failed"
210
211     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
212                  cancellable=True, retry_wait=1, max_retry_wait=180):
213         # If a ShutdownActor is cancellable, it will ask the
214         # ComputeNodeMonitorActor if it's still eligible before taking each
215         # action, and stop the shutdown process if the node is no longer
216         # eligible.  Normal shutdowns based on job demand should be
217         # cancellable; shutdowns based on node misbehavior should not.
218         super(ComputeNodeShutdownActor, self).__init__(
219             cloud_client, arvados_client, timer_actor,
220             retry_wait, max_retry_wait)
221         self._monitor = node_monitor.proxy()
222         self.cloud_node = self._monitor.cloud_node.get()
223         self.cancellable = cancellable
224         self.cancel_reason = None
225         self.success = None
226
227     def _set_logger(self):
228         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
229
230     def on_start(self):
231         super(ComputeNodeShutdownActor, self).on_start()
232         self._later.shutdown_node()
233
234     def _arvados_node(self):
235         return self._monitor.arvados_node.get()
236
237     def _finished(self, success_flag=None):
238         if success_flag is not None:
239             self.success = success_flag
240         return super(ComputeNodeShutdownActor, self)._finished()
241
242     def cancel_shutdown(self, reason, **kwargs):
243         if self.cancel_reason is not None:
244             # already cancelled
245             return
246         self.cancel_reason = reason
247         self._logger.info("Shutdown cancelled: %s.", reason)
248         self._finished(success_flag=False)
249
250     def _cancel_on_exception(orig_func):
251         @functools.wraps(orig_func)
252         def finish_wrapper(self, *args, **kwargs):
253             try:
254                 return orig_func(self, *args, **kwargs)
255             except Exception as error:
256                 self._logger.error("Actor error %s", error)
257                 self._logger.debug("", exc_info=True)
258                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
259         return finish_wrapper
260
261     @_cancel_on_exception
262     def shutdown_node(self):
263         if self.cancel_reason is not None:
264             # already cancelled
265             return
266         if self.cancellable:
267             self._logger.info("Checking that node is still eligible for shutdown")
268             eligible, reason = self._monitor.shutdown_eligible().get()
269             if not eligible:
270                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
271                                      try_resume=True)
272                 return
273         self._destroy_node()
274
275     def _destroy_node(self):
276         self._logger.info("Starting shutdown")
277         arv_node = self._arvados_node()
278         if self._cloud.destroy_node(self.cloud_node):
279             self._logger.info("Shutdown success")
280             if arv_node:
281                 self._later.clean_arvados_node(arv_node)
282             else:
283                 self._finished(success_flag=True)
284         else:
285             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
286
287     @ComputeNodeStateChangeBase._finish_on_exception
288     @RetryMixin._retry(config.ARVADOS_ERRORS)
289     def clean_arvados_node(self, arvados_node):
290         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
291         self._finished(success_flag=True)
292
293
294 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
295     """Actor to dispatch one-off cloud management requests.
296
297     This actor receives requests for small cloud updates, and
298     dispatches them to a real driver.  ComputeNodeMonitorActors use
299     this to perform maintenance tasks on themselves.  Having a
300     dedicated actor for this gives us the opportunity to control the
301     flow of requests; e.g., by backing off when errors occur.
302     """
303     def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
304         super(ComputeNodeUpdateActor, self).__init__()
305         RetryMixin.__init__(self, 1, max_retry_wait,
306                             None, cloud_factory(), timer_actor)
307         self._cloud = cloud_factory()
308         self._later = self.actor_ref.tell_proxy()
309
310     def _set_logger(self):
311         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
312
313     def on_start(self):
314         self._set_logger()
315
316     @RetryMixin._retry()
317     def sync_node(self, cloud_node, arvados_node):
318         return self._cloud.sync_node(cloud_node, arvados_node)
319
320
321 class ComputeNodeMonitorActor(config.actor_class):
322     """Actor to manage a running compute node.
323
324     This actor gets updates about a compute node's cloud and Arvados records.
325     It uses this information to notify subscribers when the node is eligible
326     for shutdown.
327     """
328     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
329                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
330                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
331                  boot_fail_after=1800
332     ):
333         super(ComputeNodeMonitorActor, self).__init__()
334         self._later = self.actor_ref.tell_proxy()
335         self._shutdowns = shutdown_timer
336         self._cloud_node_fqdn = cloud_fqdn_func
337         self._timer = timer_actor
338         self._update = update_actor
339         self._cloud = cloud_client
340         self.cloud_node = cloud_node
341         self.cloud_node_start_time = cloud_node_start_time
342         self.poll_stale_after = poll_stale_after
343         self.node_stale_after = node_stale_after
344         self.boot_fail_after = boot_fail_after
345         self.subscribers = set()
346         self.arvados_node = None
347         self._later.update_arvados_node(arvados_node)
348         self.last_shutdown_opening = None
349         self._later.consider_shutdown()
350
351     def _set_logger(self):
352         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
353
354     def on_start(self):
355         self._set_logger()
356         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
357
358     def subscribe(self, subscriber):
359         self.subscribers.add(subscriber)
360
361     def _debug(self, msg, *args):
362         self._logger.debug(msg, *args)
363
364     def get_state(self):
365         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
366
367         # If this node is not associated with an Arvados node, return
368         # 'unpaired' if we're in the boot grace period, and 'down' if not,
369         # so it isn't counted towards usable nodes.
370         if self.arvados_node is None:
371             if timestamp_fresh(self.cloud_node_start_time,
372                                self.boot_fail_after):
373                 return 'unpaired'
374             else:
375                 return 'down'
376
377         state = self.arvados_node['crunch_worker_state']
378
379         # If state information is not available because it is missing or the
380         # record is stale, return 'down'.
381         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
382                                             self.node_stale_after):
383             state = 'down'
384
385         # There's a window between when a node pings for the first time and the
386         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
387         # window, the node will still report as 'down'.  Check that
388         # first_ping_at is truthy and consider the node 'idle' during the
389         # initial boot grace period.
390         if (state == 'down' and
391             self.arvados_node['first_ping_at'] and
392             timestamp_fresh(self.cloud_node_start_time,
393                             self.boot_fail_after) and
394             not self._cloud.broken(self.cloud_node)):
395             state = 'idle'
396
397         # "missing" means last_ping_at is stale, this should be
398         # considered "down"
399         if arvados_node_missing(self.arvados_node, self.node_stale_after):
400             state = 'down'
401
402         # Turns out using 'job_uuid' this way is a bad idea.  The node record
403         # is assigned the job_uuid before the job is locked (which removes it
404         # from the queue) which means the job will be double-counted as both in
405         # the wishlist and but also keeping a node busy.  This end result is
406         # excess nodes being booted.
407         #if state == 'idle' and self.arvados_node['job_uuid']:
408         #    state = 'busy'
409
410         return state
411
412     def in_state(self, *states):
413         return self.get_state() in states
414
415     def shutdown_eligible(self):
416         """Determine if node is candidate for shut down.
417
418         Returns a tuple of (boolean, string) where the first value is whether
419         the node is candidate for shut down, and the second value is the
420         reason for the decision.
421         """
422
423         # Collect states and then consult state transition table whether we
424         # should shut down.  Possible states are:
425         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
426         # window = ["open", "closed"]
427         # boot_grace = ["boot wait", "boot exceeded"]
428         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
429
430         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
431             return (False, "node state is stale")
432
433         crunch_worker_state = self.get_state()
434
435         window = "open" if self._shutdowns.window_open() else "closed"
436
437         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
438             boot_grace = "boot wait"
439         else:
440             boot_grace = "boot exceeded"
441
442         # API server side not implemented yet.
443         idle_grace = 'idle exceeded'
444
445         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
446         t = transitions[node_state]
447         if t is not None:
448             # yes, shutdown eligible
449             return (True, "node state is %s" % (node_state,))
450         else:
451             # no, return a reason
452             return (False, "node state is %s" % (node_state,))
453
454     def consider_shutdown(self):
455         try:
456             eligible, reason = self.shutdown_eligible()
457             next_opening = self._shutdowns.next_opening()
458             if eligible:
459                 self._debug("Suggesting shutdown because %s", reason)
460                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
461             else:
462                 self._debug("Not eligible for shut down because %s", reason)
463
464                 if self.last_shutdown_opening != next_opening:
465                     self._debug("Shutdown window closed.  Next at %s.",
466                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
467                     self._timer.schedule(next_opening, self._later.consider_shutdown)
468                     self.last_shutdown_opening = next_opening
469         except Exception:
470             self._logger.exception("Unexpected exception")
471
472     def offer_arvados_pair(self, arvados_node):
473         first_ping_s = arvados_node.get('first_ping_at')
474         if (self.arvados_node is not None) or (not first_ping_s):
475             return None
476         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
477               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
478             self._later.update_arvados_node(arvados_node)
479             return self.cloud_node.id
480         else:
481             return None
482
483     def update_cloud_node(self, cloud_node):
484         if cloud_node is not None:
485             self.cloud_node = cloud_node
486             self._later.consider_shutdown()
487
488     def update_arvados_node(self, arvados_node):
489         # If the cloud node's FQDN doesn't match what's in the Arvados node
490         # record, make them match.
491         # This method is a little unusual in the way it just fires off the
492         # request without checking the result or retrying errors.  That's
493         # because this update happens every time we reload the Arvados node
494         # list: if a previous sync attempt failed, we'll see that the names
495         # are out of sync and just try again.  ComputeNodeUpdateActor has
496         # the logic to throttle those effective retries when there's trouble.
497         if arvados_node is not None:
498             self.arvados_node = arvados_node
499             if (self._cloud_node_fqdn(self.cloud_node) !=
500                   arvados_node_fqdn(self.arvados_node)):
501                 self._update.sync_node(self.cloud_node, self.arvados_node)
502             self._later.consider_shutdown()