X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b626a85eb86fd4909712852040cd305c71c37ee5..6874da0dd56d6c0320880dfb3bd4da34a3c0a7d3:/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py?ds=sidebyside diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py index d613ef1371..ae0a65b931 100644 --- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py @@ -6,6 +6,7 @@ import functools import logging import time +import libcloud.common.types as cloud_types import pykka from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh @@ -40,13 +41,14 @@ class ComputeNodeStateChangeBase(config.actor_class): def decorator(orig_func): @functools.wraps(orig_func) def wrapper(self, *args, **kwargs): + start_time = time.time() try: orig_func(self, *args, **kwargs) except errors as error: self._logger.warning( "Client error: %s - waiting %s seconds", error, self.retry_wait) - self._timer.schedule(self.retry_wait, + self._timer.schedule(start_time + self.retry_wait, getattr(self._later, orig_func.__name__), *args, **kwargs) @@ -133,19 +135,48 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): This actor simply destroys a cloud node, retrying as needed. """ - def __init__(self, timer_actor, cloud_client, cloud_node, + def __init__(self, timer_actor, cloud_client, node_monitor, retry_wait=1, max_retry_wait=180): super(ComputeNodeShutdownActor, self).__init__( 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait) self._cloud = cloud_client - self.cloud_node = cloud_node + self._monitor = node_monitor.proxy() + self.cloud_node = self._monitor.cloud_node.get() + self.success = None + + def on_start(self): self._later.shutdown_node() + def cancel_shutdown(self): + self.success = False + self._finished() + + def _stop_if_window_closed(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + if not self._monitor.shutdown_eligible().get(): + self._logger.info( + "Cloud node %s shutdown cancelled - no longer eligible.", + self.cloud_node.id) + self._later.cancel_shutdown() + return None + else: + return orig_func(self, *args, **kwargs) + return wrapper + + @_stop_if_window_closed @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS) def shutdown_node(self): - self._cloud.destroy_node(self.cloud_node) - self._logger.info("Cloud node %s shut down.", self.cloud_node.id) - self._finished() + if self._cloud.destroy_node(self.cloud_node): + self._logger.info("Cloud node %s shut down.", self.cloud_node.id) + self.success = True + self._finished() + else: + # Force a retry. + raise cloud_types.LibcloudError("destroy_node failed") + + # Make the decorator available to subclasses. + _stop_if_window_closed = staticmethod(_stop_if_window_closed) class ComputeNodeUpdateActor(config.actor_class): @@ -246,8 +277,10 @@ class ComputeNodeMonitorActor(config.actor_class): result = result and not self.arvados_node['job_uuid'] return result - def _shutdown_eligible(self): - if self.arvados_node is None: + def shutdown_eligible(self): + if not self._shutdowns.window_open(): + return False + elif self.arvados_node is None: # If this is a new, unpaired node, it's eligible for # shutdown--we figure there was an error during bootstrap. return timestamp_fresh(self.cloud_node_start_time, @@ -257,17 +290,15 @@ class ComputeNodeMonitorActor(config.actor_class): def consider_shutdown(self): next_opening = self._shutdowns.next_opening() - if self._shutdowns.window_open(): - if self._shutdown_eligible(): - self._debug("Node %s suggesting shutdown.", self.cloud_node.id) - _notify_subscribers(self._later, self.subscribers) - else: - self._debug("Node %s shutdown window open but node busy.", - self.cloud_node.id) - else: + if self.shutdown_eligible(): + self._debug("Node %s suggesting shutdown.", self.cloud_node.id) + _notify_subscribers(self._later, self.subscribers) + elif self._shutdowns.window_open(): + self._debug("Node %s shutdown window open but node busy.", + self.cloud_node.id) + elif self.last_shutdown_opening != next_opening: self._debug("Node %s shutdown window closed. Next at %s.", self.cloud_node.id, time.ctime(next_opening)) - if self.last_shutdown_opening != next_opening: self._timer.schedule(next_opening, self._later.consider_shutdown) self.last_shutdown_opening = next_opening