2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
13 import libcloud.common.types as cloud_types
14 from libcloud.common.exceptions import BaseHTTPError
19 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
20 arvados_node_missing, RetryMixin
21 from ...clientactor import _notify_subscribers
22 from ... import config
23 from .transitions import transitions
25 QuotaExceeded = "QuotaExceeded"
27 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
28 """Base class for actors that change a compute node's state.
30 This base class takes care of retrying changes and notifying
31 subscribers when the change is finished.
33 def __init__(self, cloud_client, arvados_client, timer_actor,
34 retry_wait, max_retry_wait):
35 super(ComputeNodeStateChangeBase, self).__init__()
36 RetryMixin.__init__(self, retry_wait, max_retry_wait,
37 None, cloud_client, timer_actor)
38 self._later = self.actor_ref.tell_proxy()
39 self._arvados = arvados_client
40 self.subscribers = set()
42 def _set_logger(self):
43 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
49 if self.subscribers is None:
50 raise Exception("Actor tried to finish twice")
51 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
52 self.subscribers = None
53 self._logger.info("finished")
55 def subscribe(self, subscriber):
56 if self.subscribers is None:
58 subscriber(self.actor_ref.proxy())
59 except pykka.ActorDeadError:
62 self.subscribers.add(subscriber)
64 def _clean_arvados_node(self, arvados_node, explanation):
65 return self._arvados.nodes().update(
66 uuid=arvados_node['uuid'],
67 body={'hostname': None,
70 'first_ping_at': None,
73 'info': {'ec2_instance_id': None,
74 'last_action': explanation}},
78 def _finish_on_exception(orig_func):
79 @functools.wraps(orig_func)
80 def finish_wrapper(self, *args, **kwargs):
82 return orig_func(self, *args, **kwargs)
83 except Exception as error:
84 self._logger.error("Actor error %s", error)
89 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
90 """Actor to create and set up a cloud compute node.
92 This actor prepares an Arvados node record for a new compute node
93 (either creating one or cleaning one passed in), then boots the
94 actual compute node. It notifies subscribers when the cloud node
95 is successfully created (the last step in the process for Node
98 def __init__(self, timer_actor, arvados_client, cloud_client,
99 cloud_size, arvados_node=None,
100 retry_wait=1, max_retry_wait=180):
101 super(ComputeNodeSetupActor, self).__init__(
102 cloud_client, arvados_client, timer_actor,
103 retry_wait, max_retry_wait)
104 self.cloud_size = cloud_size
105 self.arvados_node = None
106 self.cloud_node = None
108 if arvados_node is None:
109 self._later.create_arvados_node()
111 self._later.prepare_arvados_node(arvados_node)
113 @ComputeNodeStateChangeBase._finish_on_exception
114 @RetryMixin._retry(config.ARVADOS_ERRORS)
115 def create_arvados_node(self):
116 self.arvados_node = self._arvados.nodes().create(body={}).execute()
117 self._later.create_cloud_node()
119 @ComputeNodeStateChangeBase._finish_on_exception
120 @RetryMixin._retry(config.ARVADOS_ERRORS)
121 def prepare_arvados_node(self, node):
122 self.arvados_node = self._clean_arvados_node(
123 node, "Prepared by Node Manager")
124 self._later.create_cloud_node()
126 @ComputeNodeStateChangeBase._finish_on_exception
128 def create_cloud_node(self):
129 self._logger.info("Sending create_node request for node size %s.",
130 self.cloud_size.name)
132 self.cloud_node = self._cloud.create_node(self.cloud_size,
134 except BaseHTTPError as e:
135 if e.code == 429 or "RequestLimitExceeded" in e.message:
136 # Don't consider API rate limits to be quota errors.
137 # re-raise so the Retry logic applies.
140 # The set of possible error codes / messages isn't documented for
141 # all clouds, so use a keyword heuristic to determine if the
142 # failure is likely due to a quota.
143 if re.search(r'(exceed|quota|limit)', e.message, re.I):
144 self.error = QuotaExceeded
145 self._logger.warning("Quota exceeded: %s", e)
149 # Something else happened, re-raise so the Retry logic applies.
151 except Exception as e:
154 # The information included in the node size object we get from libcloud
155 # is inconsistent between cloud drivers. Replace libcloud NodeSize
156 # object with compatible CloudSizeWrapper object which merges the size
157 # info reported from the cloud with size information from the
158 # configuration file.
159 self.cloud_node.size = self.cloud_size
161 self._logger.info("Cloud node %s created.", self.cloud_node.id)
162 self._later.update_arvados_node_properties()
164 @ComputeNodeStateChangeBase._finish_on_exception
165 @RetryMixin._retry(config.ARVADOS_ERRORS)
166 def update_arvados_node_properties(self):
167 """Tell Arvados some details about the cloud node.
169 Currently we only include size/price from our request, which
170 we already knew before create_cloud_node(), but doing it here
171 gives us an opportunity to provide more detail from
172 self.cloud_node, too.
174 self.arvados_node['properties']['cloud_node'] = {
175 # Note this 'size' is the node size we asked the cloud
176 # driver to create -- not necessarily equal to the size
177 # reported by the cloud driver for the node that was
179 'size': self.cloud_size.id,
180 'price': self.cloud_size.price,
182 self.arvados_node = self._arvados.nodes().update(
183 uuid=self.arvados_node['uuid'],
184 body={'properties': self.arvados_node['properties']},
186 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
187 self._later.post_create()
190 def post_create(self):
191 self._cloud.post_create_node(self.cloud_node)
192 self._logger.info("%s post-create work done.", self.cloud_node.id)
195 def stop_if_no_cloud_node(self):
196 if self.cloud_node is not None:
202 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
203 """Actor to shut down a compute node.
205 This actor simply destroys a cloud node, retrying as needed.
207 # Reasons for a shutdown to be cancelled.
208 WINDOW_CLOSED = "shutdown window closed"
209 DESTROY_FAILED = "destroy_node failed"
211 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
212 cancellable=True, retry_wait=1, max_retry_wait=180):
213 # If a ShutdownActor is cancellable, it will ask the
214 # ComputeNodeMonitorActor if it's still eligible before taking each
215 # action, and stop the shutdown process if the node is no longer
216 # eligible. Normal shutdowns based on job demand should be
217 # cancellable; shutdowns based on node misbehavior should not.
218 super(ComputeNodeShutdownActor, self).__init__(
219 cloud_client, arvados_client, timer_actor,
220 retry_wait, max_retry_wait)
221 self._monitor = node_monitor.proxy()
222 self.cloud_node = self._monitor.cloud_node.get()
223 self.cancellable = cancellable
224 self.cancel_reason = None
227 def _set_logger(self):
228 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
231 super(ComputeNodeShutdownActor, self).on_start()
232 self._later.shutdown_node()
234 def _arvados_node(self):
235 return self._monitor.arvados_node.get()
237 def _finished(self, success_flag=None):
238 if success_flag is not None:
239 self.success = success_flag
240 return super(ComputeNodeShutdownActor, self)._finished()
242 def cancel_shutdown(self, reason, **kwargs):
243 if self.cancel_reason is not None:
246 self.cancel_reason = reason
247 self._logger.info("Shutdown cancelled: %s.", reason)
248 self._finished(success_flag=False)
250 def _cancel_on_exception(orig_func):
251 @functools.wraps(orig_func)
252 def finish_wrapper(self, *args, **kwargs):
254 return orig_func(self, *args, **kwargs)
255 except Exception as error:
256 self._logger.error("Actor error %s", error)
257 self._logger.debug("", exc_info=True)
258 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
259 return finish_wrapper
261 @_cancel_on_exception
262 def shutdown_node(self):
263 if self.cancel_reason is not None:
267 self._logger.info("Checking that node is still eligible for shutdown")
268 eligible, reason = self._monitor.shutdown_eligible().get()
270 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
275 def _destroy_node(self):
276 self._logger.info("Starting shutdown")
277 arv_node = self._arvados_node()
278 if self._cloud.destroy_node(self.cloud_node):
279 self._logger.info("Shutdown success")
281 self._later.clean_arvados_node(arv_node)
283 self._finished(success_flag=True)
285 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
287 @ComputeNodeStateChangeBase._finish_on_exception
288 @RetryMixin._retry(config.ARVADOS_ERRORS)
289 def clean_arvados_node(self, arvados_node):
290 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
291 self._finished(success_flag=True)
294 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
295 """Actor to dispatch one-off cloud management requests.
297 This actor receives requests for small cloud updates, and
298 dispatches them to a real driver. ComputeNodeMonitorActors use
299 this to perform maintenance tasks on themselves. Having a
300 dedicated actor for this gives us the opportunity to control the
301 flow of requests; e.g., by backing off when errors occur.
303 def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
304 super(ComputeNodeUpdateActor, self).__init__()
305 RetryMixin.__init__(self, 1, max_retry_wait,
306 None, cloud_factory(), timer_actor)
307 self._cloud = cloud_factory()
308 self._later = self.actor_ref.tell_proxy()
310 def _set_logger(self):
311 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
317 def sync_node(self, cloud_node, arvados_node):
318 return self._cloud.sync_node(cloud_node, arvados_node)
321 class ComputeNodeMonitorActor(config.actor_class):
322 """Actor to manage a running compute node.
324 This actor gets updates about a compute node's cloud and Arvados records.
325 It uses this information to notify subscribers when the node is eligible
328 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
329 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
330 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
333 super(ComputeNodeMonitorActor, self).__init__()
334 self._later = self.actor_ref.tell_proxy()
335 self._shutdowns = shutdown_timer
336 self._cloud_node_fqdn = cloud_fqdn_func
337 self._timer = timer_actor
338 self._update = update_actor
339 self._cloud = cloud_client
340 self.cloud_node = cloud_node
341 self.cloud_node_start_time = cloud_node_start_time
342 self.poll_stale_after = poll_stale_after
343 self.node_stale_after = node_stale_after
344 self.boot_fail_after = boot_fail_after
345 self.subscribers = set()
346 self.arvados_node = None
347 self._later.update_arvados_node(arvados_node)
348 self.last_shutdown_opening = None
349 self._later.consider_shutdown()
351 def _set_logger(self):
352 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
356 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
358 def subscribe(self, subscriber):
359 self.subscribers.add(subscriber)
361 def _debug(self, msg, *args):
362 self._logger.debug(msg, *args)
365 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
367 # If this node is not associated with an Arvados node, return
368 # 'unpaired' if we're in the boot grace period, and 'down' if not,
369 # so it isn't counted towards usable nodes.
370 if self.arvados_node is None:
371 if timestamp_fresh(self.cloud_node_start_time,
372 self.boot_fail_after):
377 state = self.arvados_node['crunch_worker_state']
379 # If state information is not available because it is missing or the
380 # record is stale, return 'down'.
381 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
382 self.node_stale_after):
385 # There's a window between when a node pings for the first time and the
386 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
387 # window, the node will still report as 'down'. Check that
388 # first_ping_at is truthy and consider the node 'idle' during the
389 # initial boot grace period.
390 if (state == 'down' and
391 self.arvados_node['first_ping_at'] and
392 timestamp_fresh(self.cloud_node_start_time,
393 self.boot_fail_after) and
394 not self._cloud.broken(self.cloud_node)):
397 # "missing" means last_ping_at is stale, this should be
399 if arvados_node_missing(self.arvados_node, self.node_stale_after):
402 # Turns out using 'job_uuid' this way is a bad idea. The node record
403 # is assigned the job_uuid before the job is locked (which removes it
404 # from the queue) which means the job will be double-counted as both in
405 # the wishlist and but also keeping a node busy. This end result is
406 # excess nodes being booted.
407 #if state == 'idle' and self.arvados_node['job_uuid']:
412 def in_state(self, *states):
413 return self.get_state() in states
415 def shutdown_eligible(self):
416 """Determine if node is candidate for shut down.
418 Returns a tuple of (boolean, string) where the first value is whether
419 the node is candidate for shut down, and the second value is the
420 reason for the decision.
423 # Collect states and then consult state transition table whether we
424 # should shut down. Possible states are:
425 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
426 # window = ["open", "closed"]
427 # boot_grace = ["boot wait", "boot exceeded"]
428 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
430 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
431 return (False, "node state is stale")
433 crunch_worker_state = self.get_state()
435 window = "open" if self._shutdowns.window_open() else "closed"
437 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
438 boot_grace = "boot wait"
440 boot_grace = "boot exceeded"
442 # API server side not implemented yet.
443 idle_grace = 'idle exceeded'
445 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
446 t = transitions[node_state]
448 # yes, shutdown eligible
449 return (True, "node state is %s" % (node_state,))
451 # no, return a reason
452 return (False, "node state is %s" % (node_state,))
454 def consider_shutdown(self):
456 eligible, reason = self.shutdown_eligible()
457 next_opening = self._shutdowns.next_opening()
459 self._debug("Suggesting shutdown because %s", reason)
460 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
462 self._debug("Not eligible for shut down because %s", reason)
464 if self.last_shutdown_opening != next_opening:
465 self._debug("Shutdown window closed. Next at %s.",
466 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
467 self._timer.schedule(next_opening, self._later.consider_shutdown)
468 self.last_shutdown_opening = next_opening
470 self._logger.exception("Unexpected exception")
472 def offer_arvados_pair(self, arvados_node):
473 first_ping_s = arvados_node.get('first_ping_at')
474 if (self.arvados_node is not None) or (not first_ping_s):
476 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
477 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
478 self._later.update_arvados_node(arvados_node)
479 return self.cloud_node.id
483 def update_cloud_node(self, cloud_node):
484 if cloud_node is not None:
485 self.cloud_node = cloud_node
486 self._later.consider_shutdown()
488 def update_arvados_node(self, arvados_node):
489 # If the cloud node's FQDN doesn't match what's in the Arvados node
490 # record, make them match.
491 # This method is a little unusual in the way it just fires off the
492 # request without checking the result or retrying errors. That's
493 # because this update happens every time we reload the Arvados node
494 # list: if a previous sync attempt failed, we'll see that the names
495 # are out of sync and just try again. ComputeNodeUpdateActor has
496 # the logic to throttle those effective retries when there's trouble.
497 if arvados_node is not None:
498 self.arvados_node = arvados_node
499 if (self._cloud_node_fqdn(self.cloud_node) !=
500 arvados_node_fqdn(self.arvados_node)):
501 self._update.sync_node(self.cloud_node, self.arvados_node)
502 self._later.consider_shutdown()