2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
19 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20 arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from ... import status
24 from .transitions import transitions
26 QuotaExceeded = "QuotaExceeded"
28 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
29 """Base class for actors that change a compute node's state.
31 This base class takes care of retrying changes and notifying
32 subscribers when the change is finished.
34 def __init__(self, cloud_client, arvados_client, timer_actor,
35 retry_wait, max_retry_wait):
36 super(ComputeNodeStateChangeBase, self).__init__()
37 RetryMixin.__init__(self, retry_wait, max_retry_wait,
38 None, cloud_client, timer_actor)
39 self._later = self.actor_ref.tell_proxy()
40 self._arvados = arvados_client
41 self.subscribers = set()
43 def _set_logger(self):
44 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
50 if self.subscribers is None:
51 raise Exception("Actor tried to finish twice")
52 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
53 self.subscribers = None
54 self._logger.info("finished")
56 def subscribe(self, subscriber):
57 if self.subscribers is None:
59 subscriber(self.actor_ref.proxy())
60 except pykka.ActorDeadError:
63 self.subscribers.add(subscriber)
65 def _clean_arvados_node(self, arvados_node, explanation):
66 return self._arvados.nodes().update(
67 uuid=arvados_node['uuid'],
68 body={'hostname': None,
71 'first_ping_at': None,
74 'info': {'ec2_instance_id': None,
75 'last_action': explanation}},
79 def _finish_on_exception(orig_func):
80 @functools.wraps(orig_func)
81 def finish_wrapper(self, *args, **kwargs):
83 return orig_func(self, *args, **kwargs)
84 except Exception as error:
85 self._logger.error("Actor error %s", error)
90 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
91 """Actor to create and set up a cloud compute node.
93 This actor prepares an Arvados node record for a new compute node
94 (either creating one or cleaning one passed in), then boots the
95 actual compute node. It notifies subscribers when the cloud node
96 is successfully created (the last step in the process for Node
99 def __init__(self, timer_actor, arvados_client, cloud_client,
100 cloud_size, arvados_node=None,
101 retry_wait=1, max_retry_wait=180):
102 super(ComputeNodeSetupActor, self).__init__(
103 cloud_client, arvados_client, timer_actor,
104 retry_wait, max_retry_wait)
105 self.cloud_size = cloud_size
106 self.arvados_node = None
107 self.cloud_node = None
109 if arvados_node is None:
110 self._later.create_arvados_node()
112 self._later.prepare_arvados_node(arvados_node)
114 @ComputeNodeStateChangeBase._finish_on_exception
115 @RetryMixin._retry(config.ARVADOS_ERRORS)
116 def create_arvados_node(self):
117 self.arvados_node = self._arvados.nodes().create(
118 body={}, assign_slot=True).execute()
119 self._later.create_cloud_node()
121 @ComputeNodeStateChangeBase._finish_on_exception
122 @RetryMixin._retry(config.ARVADOS_ERRORS)
123 def prepare_arvados_node(self, node):
124 self._clean_arvados_node(node, "Prepared by Node Manager")
125 self.arvados_node = self._arvados.nodes().update(
126 uuid=node['uuid'], body={}, assign_slot=True).execute()
127 self._later.create_cloud_node()
129 @ComputeNodeStateChangeBase._finish_on_exception
131 def create_cloud_node(self):
132 self._logger.info("Sending create_node request for node size %s.",
135 self.cloud_node = self._cloud.create_node(self.cloud_size,
137 except BaseHTTPError as e:
138 if e.code == 429 or "RequestLimitExceeded" in e.message:
139 # Don't consider API rate limits to be quota errors.
140 # re-raise so the Retry logic applies.
143 # The set of possible error codes / messages isn't documented for
144 # all clouds, so use a keyword heuristic to determine if the
145 # failure is likely due to a quota.
146 if re.search(r'(exceed|quota|limit)', e.message, re.I):
147 self.error = QuotaExceeded
148 self._logger.warning("Quota exceeded: %s", e)
152 # Something else happened, re-raise so the Retry logic applies.
154 except Exception as e:
157 # The information included in the node size object we get from libcloud
158 # is inconsistent between cloud drivers. Replace libcloud NodeSize
159 # object with compatible CloudSizeWrapper object which merges the size
160 # info reported from the cloud with size information from the
161 # configuration file.
162 self.cloud_node.size = self.cloud_size
164 self._logger.info("Cloud node %s created.", self.cloud_node.id)
165 self._later.update_arvados_node_properties()
167 @ComputeNodeStateChangeBase._finish_on_exception
168 @RetryMixin._retry(config.ARVADOS_ERRORS)
169 def update_arvados_node_properties(self):
170 """Tell Arvados some details about the cloud node.
172 Currently we only include size/price from our request, which
173 we already knew before create_cloud_node(), but doing it here
174 gives us an opportunity to provide more detail from
175 self.cloud_node, too.
177 self.arvados_node['properties']['cloud_node'] = {
178 # Note this 'size' is the node size we asked the cloud
179 # driver to create -- not necessarily equal to the size
180 # reported by the cloud driver for the node that was
182 'size': self.cloud_size.id,
183 'price': self.cloud_size.price,
185 self.arvados_node = self._arvados.nodes().update(
186 uuid=self.arvados_node['uuid'],
187 body={'properties': self.arvados_node['properties']},
189 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
190 self._later.post_create()
193 def post_create(self):
194 self._cloud.post_create_node(self.cloud_node)
195 self._logger.info("%s post-create work done.", self.cloud_node.id)
198 def stop_if_no_cloud_node(self):
199 if self.cloud_node is not None:
205 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
206 """Actor to shut down a compute node.
208 This actor simply destroys a cloud node, retrying as needed.
210 # Reasons for a shutdown to be cancelled.
211 WINDOW_CLOSED = "shutdown window closed"
212 DESTROY_FAILED = "destroy_node failed"
214 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
215 cancellable=True, retry_wait=1, max_retry_wait=180):
216 # If a ShutdownActor is cancellable, it will ask the
217 # ComputeNodeMonitorActor if it's still eligible before taking each
218 # action, and stop the shutdown process if the node is no longer
219 # eligible. Normal shutdowns based on job demand should be
220 # cancellable; shutdowns based on node misbehavior should not.
221 super(ComputeNodeShutdownActor, self).__init__(
222 cloud_client, arvados_client, timer_actor,
223 retry_wait, max_retry_wait)
224 self._monitor = node_monitor.proxy()
225 self.cloud_node = self._monitor.cloud_node.get()
226 self.cancellable = cancellable
227 self.cancel_reason = None
230 def _set_logger(self):
231 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
234 super(ComputeNodeShutdownActor, self).on_start()
235 self._later.shutdown_node()
237 def _arvados_node(self):
238 return self._monitor.arvados_node.get()
240 def _finished(self, success_flag=None):
241 if success_flag is not None:
242 self.success = success_flag
243 return super(ComputeNodeShutdownActor, self)._finished()
245 def cancel_shutdown(self, reason, **kwargs):
246 if not self.cancellable:
248 if self.cancel_reason is not None:
251 self.cancel_reason = reason
252 self._logger.info("Shutdown cancelled: %s.", reason)
253 self._finished(success_flag=False)
256 def _cancel_on_exception(orig_func):
257 @functools.wraps(orig_func)
258 def finish_wrapper(self, *args, **kwargs):
260 return orig_func(self, *args, **kwargs)
261 except Exception as error:
262 self._logger.error("Actor error %s", error)
263 self._logger.debug("", exc_info=True)
264 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
265 return finish_wrapper
267 @_cancel_on_exception
268 def shutdown_node(self):
269 if self.cancel_reason is not None:
273 self._logger.info("Checking that node is still eligible for shutdown")
274 eligible, reason = self._monitor.shutdown_eligible().get()
276 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
279 # If boot failed, count the event
280 if self._monitor.get_state().get() == 'unpaired':
281 status.tracker.counter_add('boot_failures')
284 def _destroy_node(self):
285 self._logger.info("Starting shutdown")
286 arv_node = self._arvados_node()
287 if self._cloud.destroy_node(self.cloud_node):
288 self.cancellable = False
289 self._logger.info("Shutdown success")
291 self._later.clean_arvados_node(arv_node)
293 self._finished(success_flag=True)
295 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
297 @ComputeNodeStateChangeBase._finish_on_exception
298 @RetryMixin._retry(config.ARVADOS_ERRORS)
299 def clean_arvados_node(self, arvados_node):
300 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
301 self._finished(success_flag=True)
304 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
305 """Actor to dispatch one-off cloud management requests.
307 This actor receives requests for small cloud updates, and
308 dispatches them to a real driver. ComputeNodeMonitorActors use
309 this to perform maintenance tasks on themselves. Having a
310 dedicated actor for this gives us the opportunity to control the
311 flow of requests; e.g., by backing off when errors occur.
313 def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
314 super(ComputeNodeUpdateActor, self).__init__()
315 RetryMixin.__init__(self, 1, max_retry_wait,
316 None, cloud_factory(), timer_actor)
317 self._cloud = cloud_factory()
318 self._later = self.actor_ref.tell_proxy()
320 def _set_logger(self):
321 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
327 def sync_node(self, cloud_node, arvados_node):
328 if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
329 return self._cloud.sync_node(cloud_node, arvados_node)
332 class ComputeNodeMonitorActor(config.actor_class):
333 """Actor to manage a running compute node.
335 This actor gets updates about a compute node's cloud and Arvados records.
336 It uses this information to notify subscribers when the node is eligible
339 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
340 timer_actor, update_actor, cloud_client,
341 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
342 boot_fail_after=1800, consecutive_idle_count=0
344 super(ComputeNodeMonitorActor, self).__init__()
345 self._later = self.actor_ref.tell_proxy()
346 self._shutdowns = shutdown_timer
347 self._timer = timer_actor
348 self._update = update_actor
349 self._cloud = cloud_client
350 self.cloud_node = cloud_node
351 self.cloud_node_start_time = cloud_node_start_time
352 self.poll_stale_after = poll_stale_after
353 self.node_stale_after = node_stale_after
354 self.boot_fail_after = boot_fail_after
355 self.subscribers = set()
356 self.arvados_node = None
357 self.consecutive_idle_count = consecutive_idle_count
358 self.consecutive_idle = 0
359 self._later.update_arvados_node(arvados_node)
360 self.last_shutdown_opening = None
361 self._later.consider_shutdown()
363 def _set_logger(self):
364 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
368 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
370 def subscribe(self, subscriber):
371 self.subscribers.add(subscriber)
373 def _debug(self, msg, *args):
374 self._logger.debug(msg, *args)
377 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
379 # If this node is not associated with an Arvados node, return
380 # 'unpaired' if we're in the boot grace period, and 'down' if not,
381 # so it isn't counted towards usable nodes.
382 if self.arvados_node is None:
383 if timestamp_fresh(self.cloud_node_start_time,
384 self.boot_fail_after):
389 state = self.arvados_node['crunch_worker_state']
391 # If state information is not available because it is missing or the
392 # record is stale, return 'down'.
393 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
394 self.node_stale_after):
397 # There's a window between when a node pings for the first time and the
398 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
399 # window, the node will still report as 'down'. Check that
400 # first_ping_at is truthy and consider the node 'idle' during the
401 # initial boot grace period.
402 if (state == 'down' and
403 self.arvados_node['first_ping_at'] and
404 timestamp_fresh(self.cloud_node_start_time,
405 self.boot_fail_after) and
406 not self._cloud.broken(self.cloud_node)):
409 # "missing" means last_ping_at is stale, this should be
411 if arvados_node_missing(self.arvados_node, self.node_stale_after):
414 # Turns out using 'job_uuid' this way is a bad idea. The node record
415 # is assigned the job_uuid before the job is locked (which removes it
416 # from the queue) which means the job will be double-counted as both in
417 # the wishlist and but also keeping a node busy. This end result is
418 # excess nodes being booted.
419 #if state == 'idle' and self.arvados_node['job_uuid']:
422 # Update idle node times tracker
424 status.tracker.idle_in(self.arvados_node['hostname'])
426 status.tracker.idle_out(self.arvados_node['hostname'])
430 def in_state(self, *states):
431 return self.get_state() in states
433 def shutdown_eligible(self):
434 """Determine if node is candidate for shut down.
436 Returns a tuple of (boolean, string) where the first value is whether
437 the node is candidate for shut down, and the second value is the
438 reason for the decision.
441 # If this node's size is invalid (because it has a stale arvados_node_size
442 # tag), return True so that it's properly shut down.
443 if self.cloud_node.size.id == 'invalid':
444 return (True, "node's size tag '%s' not recognizable" % (self.cloud_node.extra['arvados_node_size'],))
446 # Collect states and then consult state transition table whether we
447 # should shut down. Possible states are:
448 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
449 # window = ["open", "closed"]
450 # boot_grace = ["boot wait", "boot exceeded"]
451 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
453 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
454 return (False, "node state is stale")
456 crunch_worker_state = self.get_state()
458 window = "open" if self._shutdowns.window_open() else "closed"
460 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
461 boot_grace = "boot wait"
463 boot_grace = "boot exceeded"
465 if crunch_worker_state == "idle":
466 # Must report as "idle" at least "consecutive_idle_count" times
467 if self.consecutive_idle < self.consecutive_idle_count:
468 idle_grace = 'idle wait'
470 idle_grace = 'idle exceeded'
472 idle_grace = 'not idle'
474 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
475 t = transitions[node_state]
477 # yes, shutdown eligible
478 return (True, "node state is %s" % (node_state,))
480 # no, return a reason
481 return (False, "node state is %s" % (node_state,))
483 def consider_shutdown(self):
485 eligible, reason = self.shutdown_eligible()
486 next_opening = self._shutdowns.next_opening()
488 self._debug("Suggesting shutdown because %s", reason)
489 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
491 self._debug("Not eligible for shut down because %s", reason)
493 if self.last_shutdown_opening != next_opening:
494 self._debug("Shutdown window closed. Next at %s.",
495 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
496 self._timer.schedule(next_opening, self._later.consider_shutdown)
497 self.last_shutdown_opening = next_opening
499 self._logger.exception("Unexpected exception")
501 def offer_arvados_pair(self, arvados_node):
502 first_ping_s = arvados_node.get('first_ping_at')
503 if (self.arvados_node is not None) or (not first_ping_s):
505 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
506 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
507 self._later.update_arvados_node(arvados_node)
508 return self.cloud_node.id
512 def update_cloud_node(self, cloud_node):
513 if cloud_node is not None:
514 self.cloud_node = cloud_node
515 self._later.consider_shutdown()
517 def update_arvados_node(self, arvados_node):
518 """Called when the latest Arvados node record is retrieved.
520 Calls the updater's sync_node() method.
523 # This method is a little unusual in the way it just fires off the
524 # request without checking the result or retrying errors. That's
525 # because this update happens every time we reload the Arvados node
526 # list: if a previous sync attempt failed, we'll see that the names
527 # are out of sync and just try again. ComputeNodeUpdateActor has
528 # the logic to throttle those effective retries when there's trouble.
529 if arvados_node is not None:
530 self.arvados_node = arvados_node
531 self._update.sync_node(self.cloud_node, self.arvados_node)
532 if self.arvados_node['crunch_worker_state'] == "idle":
533 self.consecutive_idle += 1
535 self.consecutive_idle = 0
536 self._later.consider_shutdown()