2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
19 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20 arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from .transitions import transitions
25 QuotaExceeded = "QuotaExceeded"
27 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
28 """Base class for actors that change a compute node's state.
30 This base class takes care of retrying changes and notifying
31 subscribers when the change is finished.
33 def __init__(self, cloud_client, arvados_client, timer_actor,
34 retry_wait, max_retry_wait):
35 super(ComputeNodeStateChangeBase, self).__init__()
36 RetryMixin.__init__(self, retry_wait, max_retry_wait,
37 None, cloud_client, timer_actor)
38 self._later = self.actor_ref.tell_proxy()
39 self._arvados = arvados_client
40 self.subscribers = set()
42 def _set_logger(self):
43 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
49 if self.subscribers is None:
50 raise Exception("Actor tried to finish twice")
51 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
52 self.subscribers = None
53 self._logger.info("finished")
55 def subscribe(self, subscriber):
56 if self.subscribers is None:
58 subscriber(self.actor_ref.proxy())
59 except pykka.ActorDeadError:
62 self.subscribers.add(subscriber)
64 def _clean_arvados_node(self, arvados_node, explanation):
65 return self._arvados.nodes().update(
66 uuid=arvados_node['uuid'],
67 body={'hostname': None,
70 'first_ping_at': None,
73 'info': {'ec2_instance_id': None,
74 'last_action': explanation}},
78 def _finish_on_exception(orig_func):
79 @functools.wraps(orig_func)
80 def finish_wrapper(self, *args, **kwargs):
82 return orig_func(self, *args, **kwargs)
83 except Exception as error:
84 self._logger.error("Actor error %s", error)
89 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
90 """Actor to create and set up a cloud compute node.
92 This actor prepares an Arvados node record for a new compute node
93 (either creating one or cleaning one passed in), then boots the
94 actual compute node. It notifies subscribers when the cloud node
95 is successfully created (the last step in the process for Node
98 def __init__(self, timer_actor, arvados_client, cloud_client,
99 cloud_size, arvados_node=None,
100 retry_wait=1, max_retry_wait=180):
101 super(ComputeNodeSetupActor, self).__init__(
102 cloud_client, arvados_client, timer_actor,
103 retry_wait, max_retry_wait)
104 self.cloud_size = cloud_size
105 self.arvados_node = None
106 self.cloud_node = None
108 if arvados_node is None:
109 self._later.create_arvados_node()
111 self._later.prepare_arvados_node(arvados_node)
113 @ComputeNodeStateChangeBase._finish_on_exception
114 @RetryMixin._retry(config.ARVADOS_ERRORS)
115 def create_arvados_node(self):
116 self.arvados_node = self._arvados.nodes().create(
117 body={}, assign_slot=True).execute()
118 self._later.create_cloud_node()
120 @ComputeNodeStateChangeBase._finish_on_exception
121 @RetryMixin._retry(config.ARVADOS_ERRORS)
122 def prepare_arvados_node(self, node):
123 self._clean_arvados_node(node, "Prepared by Node Manager")
124 self.arvados_node = self._arvados.nodes().update(
125 uuid=node['uuid'], body={}, assign_slot=True).execute()
126 self._later.create_cloud_node()
128 @ComputeNodeStateChangeBase._finish_on_exception
130 def create_cloud_node(self):
131 self._logger.info("Sending create_node request for node size %s.",
132 self.cloud_size.name)
134 self.cloud_node = self._cloud.create_node(self.cloud_size,
136 except BaseHTTPError as e:
137 if e.code == 429 or "RequestLimitExceeded" in e.message:
138 # Don't consider API rate limits to be quota errors.
139 # re-raise so the Retry logic applies.
142 # The set of possible error codes / messages isn't documented for
143 # all clouds, so use a keyword heuristic to determine if the
144 # failure is likely due to a quota.
145 if re.search(r'(exceed|quota|limit)', e.message, re.I):
146 self.error = QuotaExceeded
147 self._logger.warning("Quota exceeded: %s", e)
151 # Something else happened, re-raise so the Retry logic applies.
153 except Exception as e:
156 # The information included in the node size object we get from libcloud
157 # is inconsistent between cloud drivers. Replace libcloud NodeSize
158 # object with compatible CloudSizeWrapper object which merges the size
159 # info reported from the cloud with size information from the
160 # configuration file.
161 self.cloud_node.size = self.cloud_size
163 self._logger.info("Cloud node %s created.", self.cloud_node.id)
164 self._later.update_arvados_node_properties()
166 @ComputeNodeStateChangeBase._finish_on_exception
167 @RetryMixin._retry(config.ARVADOS_ERRORS)
168 def update_arvados_node_properties(self):
169 """Tell Arvados some details about the cloud node.
171 Currently we only include size/price from our request, which
172 we already knew before create_cloud_node(), but doing it here
173 gives us an opportunity to provide more detail from
174 self.cloud_node, too.
176 self.arvados_node['properties']['cloud_node'] = {
177 # Note this 'size' is the node size we asked the cloud
178 # driver to create -- not necessarily equal to the size
179 # reported by the cloud driver for the node that was
181 'size': self.cloud_size.id,
182 'price': self.cloud_size.price,
184 self.arvados_node = self._arvados.nodes().update(
185 uuid=self.arvados_node['uuid'],
186 body={'properties': self.arvados_node['properties']},
188 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
189 self._later.post_create()
192 def post_create(self):
193 self._cloud.post_create_node(self.cloud_node)
194 self._logger.info("%s post-create work done.", self.cloud_node.id)
197 def stop_if_no_cloud_node(self):
198 if self.cloud_node is not None:
204 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
205 """Actor to shut down a compute node.
207 This actor simply destroys a cloud node, retrying as needed.
209 # Reasons for a shutdown to be cancelled.
210 WINDOW_CLOSED = "shutdown window closed"
211 DESTROY_FAILED = "destroy_node failed"
213 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
214 cancellable=True, retry_wait=1, max_retry_wait=180):
215 # If a ShutdownActor is cancellable, it will ask the
216 # ComputeNodeMonitorActor if it's still eligible before taking each
217 # action, and stop the shutdown process if the node is no longer
218 # eligible. Normal shutdowns based on job demand should be
219 # cancellable; shutdowns based on node misbehavior should not.
220 super(ComputeNodeShutdownActor, self).__init__(
221 cloud_client, arvados_client, timer_actor,
222 retry_wait, max_retry_wait)
223 self._monitor = node_monitor.proxy()
224 self.cloud_node = self._monitor.cloud_node.get()
225 self.cancellable = cancellable
226 self.cancel_reason = None
229 def _set_logger(self):
230 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
233 super(ComputeNodeShutdownActor, self).on_start()
234 self._later.shutdown_node()
236 def _arvados_node(self):
237 return self._monitor.arvados_node.get()
239 def _finished(self, success_flag=None):
240 if success_flag is not None:
241 self.success = success_flag
242 return super(ComputeNodeShutdownActor, self)._finished()
244 def cancel_shutdown(self, reason, **kwargs):
245 if self.cancel_reason is not None:
248 self.cancel_reason = reason
249 self._logger.info("Shutdown cancelled: %s.", reason)
250 self._finished(success_flag=False)
252 def _cancel_on_exception(orig_func):
253 @functools.wraps(orig_func)
254 def finish_wrapper(self, *args, **kwargs):
256 return orig_func(self, *args, **kwargs)
257 except Exception as error:
258 self._logger.error("Actor error %s", error)
259 self._logger.debug("", exc_info=True)
260 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
261 return finish_wrapper
263 @_cancel_on_exception
264 def shutdown_node(self):
265 if self.cancel_reason is not None:
269 self._logger.info("Checking that node is still eligible for shutdown")
270 eligible, reason = self._monitor.shutdown_eligible().get()
272 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
277 def _destroy_node(self):
278 self._logger.info("Starting shutdown")
279 arv_node = self._arvados_node()
280 if self._cloud.destroy_node(self.cloud_node):
281 self._logger.info("Shutdown success")
283 self._later.clean_arvados_node(arv_node)
285 self._finished(success_flag=True)
287 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
289 @ComputeNodeStateChangeBase._finish_on_exception
290 @RetryMixin._retry(config.ARVADOS_ERRORS)
291 def clean_arvados_node(self, arvados_node):
292 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
293 self._finished(success_flag=True)
296 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
297 """Actor to dispatch one-off cloud management requests.
299 This actor receives requests for small cloud updates, and
300 dispatches them to a real driver. ComputeNodeMonitorActors use
301 this to perform maintenance tasks on themselves. Having a
302 dedicated actor for this gives us the opportunity to control the
303 flow of requests; e.g., by backing off when errors occur.
305 def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
306 super(ComputeNodeUpdateActor, self).__init__()
307 RetryMixin.__init__(self, 1, max_retry_wait,
308 None, cloud_factory(), timer_actor)
309 self._cloud = cloud_factory()
310 self._later = self.actor_ref.tell_proxy()
312 def _set_logger(self):
313 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
319 def sync_node(self, cloud_node, arvados_node):
320 if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
321 return self._cloud.sync_node(cloud_node, arvados_node)
324 class ComputeNodeMonitorActor(config.actor_class):
325 """Actor to manage a running compute node.
327 This actor gets updates about a compute node's cloud and Arvados records.
328 It uses this information to notify subscribers when the node is eligible
331 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
332 timer_actor, update_actor, cloud_client,
333 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
336 super(ComputeNodeMonitorActor, self).__init__()
337 self._later = self.actor_ref.tell_proxy()
338 self._shutdowns = shutdown_timer
339 self._timer = timer_actor
340 self._update = update_actor
341 self._cloud = cloud_client
342 self.cloud_node = cloud_node
343 self.cloud_node_start_time = cloud_node_start_time
344 self.poll_stale_after = poll_stale_after
345 self.node_stale_after = node_stale_after
346 self.boot_fail_after = boot_fail_after
347 self.subscribers = set()
348 self.arvados_node = None
349 self._later.update_arvados_node(arvados_node)
350 self.last_shutdown_opening = None
351 self._later.consider_shutdown()
353 def _set_logger(self):
354 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
358 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
360 def subscribe(self, subscriber):
361 self.subscribers.add(subscriber)
363 def _debug(self, msg, *args):
364 self._logger.debug(msg, *args)
367 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
369 # If this node is not associated with an Arvados node, return
370 # 'unpaired' if we're in the boot grace period, and 'down' if not,
371 # so it isn't counted towards usable nodes.
372 if self.arvados_node is None:
373 if timestamp_fresh(self.cloud_node_start_time,
374 self.boot_fail_after):
379 state = self.arvados_node['crunch_worker_state']
381 # If state information is not available because it is missing or the
382 # record is stale, return 'down'.
383 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
384 self.node_stale_after):
387 # There's a window between when a node pings for the first time and the
388 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
389 # window, the node will still report as 'down'. Check that
390 # first_ping_at is truthy and consider the node 'idle' during the
391 # initial boot grace period.
392 if (state == 'down' and
393 self.arvados_node['first_ping_at'] and
394 timestamp_fresh(self.cloud_node_start_time,
395 self.boot_fail_after) and
396 not self._cloud.broken(self.cloud_node)):
399 # "missing" means last_ping_at is stale, this should be
401 if arvados_node_missing(self.arvados_node, self.node_stale_after):
404 # Turns out using 'job_uuid' this way is a bad idea. The node record
405 # is assigned the job_uuid before the job is locked (which removes it
406 # from the queue) which means the job will be double-counted as both in
407 # the wishlist and but also keeping a node busy. This end result is
408 # excess nodes being booted.
409 #if state == 'idle' and self.arvados_node['job_uuid']:
414 def in_state(self, *states):
415 return self.get_state() in states
417 def shutdown_eligible(self):
418 """Determine if node is candidate for shut down.
420 Returns a tuple of (boolean, string) where the first value is whether
421 the node is candidate for shut down, and the second value is the
422 reason for the decision.
425 # Collect states and then consult state transition table whether we
426 # should shut down. Possible states are:
427 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
428 # window = ["open", "closed"]
429 # boot_grace = ["boot wait", "boot exceeded"]
430 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
432 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
433 return (False, "node state is stale")
435 crunch_worker_state = self.get_state()
437 window = "open" if self._shutdowns.window_open() else "closed"
439 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
440 boot_grace = "boot wait"
442 boot_grace = "boot exceeded"
444 # API server side not implemented yet.
445 idle_grace = 'idle exceeded'
447 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
448 t = transitions[node_state]
450 # yes, shutdown eligible
451 return (True, "node state is %s" % (node_state,))
453 # no, return a reason
454 return (False, "node state is %s" % (node_state,))
456 def consider_shutdown(self):
458 eligible, reason = self.shutdown_eligible()
459 next_opening = self._shutdowns.next_opening()
461 self._debug("Suggesting shutdown because %s", reason)
462 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
464 self._debug("Not eligible for shut down because %s", reason)
466 if self.last_shutdown_opening != next_opening:
467 self._debug("Shutdown window closed. Next at %s.",
468 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
469 self._timer.schedule(next_opening, self._later.consider_shutdown)
470 self.last_shutdown_opening = next_opening
472 self._logger.exception("Unexpected exception")
474 def offer_arvados_pair(self, arvados_node):
475 first_ping_s = arvados_node.get('first_ping_at')
476 if (self.arvados_node is not None) or (not first_ping_s):
478 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
479 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
480 self._later.update_arvados_node(arvados_node)
481 return self.cloud_node.id
485 def update_cloud_node(self, cloud_node):
486 if cloud_node is not None:
487 self.cloud_node = cloud_node
488 self._later.consider_shutdown()
490 def update_arvados_node(self, arvados_node):
491 """Called when the latest Arvados node record is retrieved.
493 Calls the updater's sync_node() method.
496 # This method is a little unusual in the way it just fires off the
497 # request without checking the result or retrying errors. That's
498 # because this update happens every time we reload the Arvados node
499 # list: if a previous sync attempt failed, we'll see that the names
500 # are out of sync and just try again. ComputeNodeUpdateActor has
501 # the logic to throttle those effective retries when there's trouble.
502 if arvados_node is not None:
503 self.arvados_node = arvados_node
504 self._update.sync_node(self.cloud_node, self.arvados_node)
505 self._later.consider_shutdown()