import logging
import time
+import libcloud.common.types as cloud_types
import pykka
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
from ...clientactor import _notify_subscribers
from ... import config
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ def __init__(self, logger_name, cloud_client, timer_actor,
+ retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
self._later = self.actor_ref.proxy()
- self._timer = timer_actor
self._logger = logging.getLogger(logger_name)
+ self._cloud = cloud_client
+ self._timer = timer_actor
self.min_retry_wait = retry_wait
self.max_retry_wait = max_retry_wait
self.retry_wait = retry_wait
self.subscribers = set()
@staticmethod
- def _retry(errors):
+ def _retry(errors=()):
"""Retry decorator for an actor method that makes remote requests.
Use this function to decorator an actor method, and pass in a
tuple of exceptions to catch. This decorator will schedule
retries of that method with exponential backoff if the
- original method raises any of the given errors.
+ original method raises a known cloud driver error, or any of the
+ given exception types.
"""
def decorator(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def retry_wrapper(self, *args, **kwargs):
+ start_time = time.time()
try:
orig_func(self, *args, **kwargs)
- except errors as error:
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ raise
self._logger.warning(
"Client error: %s - waiting %s seconds",
error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
+ self._timer.schedule(start_time + self.retry_wait,
getattr(self._later,
orig_func.__name__),
*args, **kwargs)
self.max_retry_wait)
else:
self.retry_wait = self.min_retry_wait
- return wrapper
+ return retry_wrapper
return decorator
def _finished(self):
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+ 'arvnodeman.nodeup', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
self._arvados = arvados_client
- self._cloud = cloud_client
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
+ self._later.post_create()
+
+ @ComputeNodeStateChangeBase._retry()
+ def post_create(self):
+ self._cloud.post_create_node(self.cloud_node)
+ self._logger.info("%s post-create work done.", self.cloud_node.id)
self._finished()
def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
+ if self.cloud_node is not None:
+ return False
+ self.stop()
+ return True
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
This actor simply destroys a cloud node, retrying as needed.
"""
- def __init__(self, timer_actor, cloud_client, cloud_node,
- retry_wait=1, max_retry_wait=180):
+ def __init__(self, timer_actor, cloud_client, node_monitor,
+ cancellable=True, retry_wait=1, max_retry_wait=180):
+ # If a ShutdownActor is cancellable, it will ask the
+ # ComputeNodeMonitorActor if it's still eligible before taking each
+ # action, and stop the shutdown process if the node is no longer
+ # eligible. Normal shutdowns based on job demand should be
+ # cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
- self.cloud_node = cloud_node
+ 'arvnodeman.nodedown', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
+ self._monitor = node_monitor.proxy()
+ self.cloud_node = self._monitor.cloud_node.get()
+ self.cancellable = cancellable
+ self.success = None
+
+ def on_start(self):
self._later.shutdown_node()
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ def cancel_shutdown(self):
+ self.success = False
self._finished()
+ def _stop_if_window_closed(orig_func):
+ @functools.wraps(orig_func)
+ def stop_wrapper(self, *args, **kwargs):
+ if (self.cancellable and
+ (not self._monitor.shutdown_eligible().get())):
+ self._logger.info(
+ "Cloud node %s shutdown cancelled - no longer eligible.",
+ self.cloud_node.id)
+ self._later.cancel_shutdown()
+ return None
+ else:
+ return orig_func(self, *args, **kwargs)
+ return stop_wrapper
+
+ @_stop_if_window_closed
+ @ComputeNodeStateChangeBase._retry()
+ def shutdown_node(self):
+ if self._cloud.destroy_node(self.cloud_node):
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self.success = True
+ self._finished()
+ else:
+ # Force a retry.
+ raise cloud_types.LibcloudError("destroy_node failed")
+
+ # Make the decorator available to subclasses.
+ _stop_if_window_closed = staticmethod(_stop_if_window_closed)
+
class ComputeNodeUpdateActor(config.actor_class):
"""Actor to dispatch one-off cloud management requests.
def _throttle_errors(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def throttle_wrapper(self, *args, **kwargs):
throttle_time = self.next_request_time - time.time()
if throttle_time > 0:
time.sleep(throttle_time)
self.next_request_time = time.time()
try:
result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
+ except Exception as error:
self.error_streak += 1
self.next_request_time += min(2 ** self.error_streak,
self.max_retry_wait)
else:
self.error_streak = 0
return result
- return wrapper
+ return throttle_wrapper
@_throttle_errors
def sync_node(self, cloud_node, arvados_node):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- timer_actor, update_actor, arvados_node=None,
+ cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
poll_stale_after=600, node_stale_after=3600):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
self._shutdowns = shutdown_timer
+ self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self.cloud_node = cloud_node
if (self.arvados_node is None) or not timestamp_fresh(
arvados_node_mtime(self.arvados_node), self.node_stale_after):
return None
- state = self.arvados_node['info'].get('slurm_state')
+ state = self.arvados_node['crunch_worker_state']
if not state:
return None
result = state in states
result = result and not self.arvados_node['job_uuid']
return result
- def _shutdown_eligible(self):
- if self.arvados_node is None:
+ def shutdown_eligible(self):
+ if not self._shutdowns.window_open():
+ return False
+ elif self.arvados_node is None:
# If this is a new, unpaired node, it's eligible for
# shutdown--we figure there was an error during bootstrap.
return timestamp_fresh(self.cloud_node_start_time,
def consider_shutdown(self):
next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
+ if self.shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ elif self.last_shutdown_opening != next_opening:
self._debug("Node %s shutdown window closed. Next at %s.",
self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
self._timer.schedule(next_opening, self._later.consider_shutdown)
self.last_shutdown_opening = next_opening
def offer_arvados_pair(self, arvados_node):
- if self.arvados_node is not None:
+ first_ping_s = arvados_node.get('first_ping_at')
+ if (self.arvados_node is not None) or (not first_ping_s):
return None
- elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+ elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+ (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
self._later.update_arvados_node(arvados_node)
return self.cloud_node.id
else:
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
+ # If the cloud node's FQDN doesn't match what's in the Arvados node
+ # record, make them match.
+ # This method is a little unusual in the way it just fires off the
+ # request without checking the result or retrying errors. That's
+ # because this update happens every time we reload the Arvados node
+ # list: if a previous sync attempt failed, we'll see that the names
+ # are out of sync and just try again. ComputeNodeUpdateActor has
+ # the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- new_hostname = arvados_node_fqdn(self.arvados_node)
- if new_hostname != self.cloud_node.name:
+ if (self._cloud_node_fqdn(self.cloud_node) !=
+ arvados_node_fqdn(self.arvados_node)):
self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()