import libcloud.common.types as cloud_types
import pykka
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
from ...clientactor import _notify_subscribers
from ... import config
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, cloud_client, timer_actor,
+ def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
self._later = self.actor_ref.proxy()
self._logger = logging.getLogger(logger_name)
self._cloud = cloud_client
+ self._arvados = arvados_client
self._timer = timer_actor
self.min_retry_wait = retry_wait
self.max_retry_wait = max_retry_wait
else:
self.subscribers.add(subscriber)
+ def _clean_arvados_node(self, arvados_node, explanation):
+ return self._arvados.nodes().update(
+ uuid=arvados_node['uuid'],
+ body={'hostname': None,
+ 'ip_address': None,
+ 'slot_number': None,
+ 'first_ping_at': None,
+ 'last_ping_at': None,
+ 'info': {'ec2_instance_id': None,
+ 'last_action': explanation}},
+ ).execute()
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', cloud_client, timer_actor,
+ 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
- self._arvados = arvados_client
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
+ self.arvados_node = self._clean_arvados_node(
+ node, "Prepared by Node Manager")
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._retry()
self._finished()
def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
+ if self.cloud_node is not None:
+ return False
+ self.stop()
+ return True
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
This actor simply destroys a cloud node, retrying as needed.
"""
- def __init__(self, timer_actor, cloud_client, node_monitor,
+ def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
cancellable=True, retry_wait=1, max_retry_wait=180):
# If a ShutdownActor is cancellable, it will ask the
# ComputeNodeMonitorActor if it's still eligible before taking each
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', cloud_client, timer_actor,
+ 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
def on_start(self):
self._later.shutdown_node()
+ def _arvados_node(self):
+ return self._monitor.arvados_node.get()
+
+ def _finished(self, success_flag=None):
+ if success_flag is not None:
+ self.success = success_flag
+ return super(ComputeNodeShutdownActor, self)._finished()
+
def cancel_shutdown(self):
- self.success = False
- self._finished()
+ self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
@_stop_if_window_closed
@ComputeNodeStateChangeBase._retry()
def shutdown_node(self):
- if self._cloud.destroy_node(self.cloud_node):
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self.success = True
- self._finished()
- else:
+ if not self._cloud.destroy_node(self.cloud_node):
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ arv_node = self._arvados_node()
+ if arv_node is None:
+ self._finished(success_flag=True)
+ else:
+ self._later.clean_arvados_node(arv_node)
+
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ def clean_arvados_node(self, arvados_node):
+ self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
+ self._finished(success_flag=True)
# Make the decorator available to subclasses.
_stop_if_window_closed = staticmethod(_stop_if_window_closed)
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- timer_actor, update_actor, arvados_node=None,
- poll_stale_after=600, node_stale_after=3600):
+ cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+ arvados_node=None, poll_stale_after=600, node_stale_after=3600,
+ boot_fail_after=1800
+ ):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
self._shutdowns = shutdown_timer
+ self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
+ self._cloud = cloud_client
self.cloud_node = cloud_node
self.cloud_node_start_time = cloud_node_start_time
self.poll_stale_after = poll_stale_after
self.node_stale_after = node_stale_after
+ self.boot_fail_after = boot_fail_after
self.subscribers = set()
self.arvados_node = None
self._later.update_arvados_node(arvados_node)
if (self.arvados_node is None) or not timestamp_fresh(
arvados_node_mtime(self.arvados_node), self.node_stale_after):
return None
- state = self.arvados_node['info'].get('slurm_state')
+ state = self.arvados_node['crunch_worker_state']
if not state:
return None
result = state in states
if not self._shutdowns.window_open():
return False
elif self.arvados_node is None:
- # If this is a new, unpaired node, it's eligible for
- # shutdown--we figure there was an error during bootstrap.
- return timestamp_fresh(self.cloud_node_start_time,
- self.node_stale_after)
+ # Node is unpaired.
+ # If it hasn't pinged Arvados after boot_fail seconds, shut it down
+ return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+ elif self.arvados_node.get('status') == "missing" and self._cloud.broken(self.cloud_node):
+ # Node is paired, but Arvados says it is missing and the cloud says the node
+ # is in an error state, so shut it down.
+ return True
else:
return self.in_state('idle')
self.last_shutdown_opening = next_opening
def offer_arvados_pair(self, arvados_node):
- if self.arvados_node is not None:
+ first_ping_s = arvados_node.get('first_ping_at')
+ if (self.arvados_node is not None) or (not first_ping_s):
return None
- elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+ elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+ (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
self._later.update_arvados_node(arvados_node)
return self.cloud_node.id
else:
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
+ # If the cloud node's FQDN doesn't match what's in the Arvados node
+ # record, make them match.
+ # This method is a little unusual in the way it just fires off the
+ # request without checking the result or retrying errors. That's
+ # because this update happens every time we reload the Arvados node
+ # list: if a previous sync attempt failed, we'll see that the names
+ # are out of sync and just try again. ComputeNodeUpdateActor has
+ # the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- new_hostname = arvados_node_fqdn(self.arvados_node)
- if new_hostname != self.cloud_node.name:
+ if (self._cloud_node_fqdn(self.cloud_node) !=
+ arvados_node_fqdn(self.arvados_node)):
self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()