import logging
import time
+import libcloud.common.types as cloud_types
import pykka
from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
def decorator(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
+ start_time = time.time()
try:
orig_func(self, *args, **kwargs)
except errors as error:
self._logger.warning(
"Client error: %s - waiting %s seconds",
error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
+ self._timer.schedule(start_time + self.retry_wait,
getattr(self._later,
orig_func.__name__),
*args, **kwargs)
This actor simply destroys a cloud node, retrying as needed.
"""
- def __init__(self, timer_actor, cloud_client, cloud_node,
+ def __init__(self, timer_actor, cloud_client, node_monitor,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeShutdownActor, self).__init__(
'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
- self.cloud_node = cloud_node
+ self._monitor = node_monitor.proxy()
+ self.cloud_node = self._monitor.cloud_node.get()
+ self.success = None
+
+ def on_start(self):
self._later.shutdown_node()
+ def cancel_shutdown(self):
+ self.success = False
+ self._finished()
+
+ def _stop_if_window_closed(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if not self._monitor.shutdown_eligible().get():
+ self._logger.info(
+ "Cloud node %s shutdown cancelled - no longer eligible.",
+ self.cloud_node.id)
+ self._later.cancel_shutdown()
+ return None
+ else:
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+ @_stop_if_window_closed
@ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
+ if self._cloud.destroy_node(self.cloud_node):
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self.success = True
+ self._finished()
+ else:
+ # Force a retry.
+ raise cloud_types.LibcloudError("destroy_node failed")
+
+ # Make the decorator available to subclasses.
+ _stop_if_window_closed = staticmethod(_stop_if_window_closed)
class ComputeNodeUpdateActor(config.actor_class):
result = result and not self.arvados_node['job_uuid']
return result
- def _shutdown_eligible(self):
- if self.arvados_node is None:
+ def shutdown_eligible(self):
+ if not self._shutdowns.window_open():
+ return False
+ elif self.arvados_node is None:
# If this is a new, unpaired node, it's eligible for
# shutdown--we figure there was an error during bootstrap.
return timestamp_fresh(self.cloud_node_start_time,
def consider_shutdown(self):
next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
+ if self.shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ elif self.last_shutdown_opening != next_opening:
self._debug("Node %s shutdown window closed. Next at %s.",
self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
self._timer.schedule(next_opening, self._later.consider_shutdown)
self.last_shutdown_opening = next_opening