This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, cloud_client, timer_actor,
+ def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
self._later = self.actor_ref.proxy()
self._logger = logging.getLogger(logger_name)
self._cloud = cloud_client
+ self._arvados = arvados_client
self._timer = timer_actor
self.min_retry_wait = retry_wait
self.max_retry_wait = max_retry_wait
else:
self.subscribers.add(subscriber)
+ def _clean_arvados_node(self, arvados_node, explanation):
+ return self._arvados.nodes().update(
+ uuid=arvados_node['uuid'],
+ body={'hostname': None,
+ 'ip_address': None,
+ 'slot_number': None,
+ 'first_ping_at': None,
+ 'last_ping_at': None,
+ 'info': {'ec2_instance_id': None,
+ 'last_action': explanation}},
+ ).execute()
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', cloud_client, timer_actor,
+ 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
- self._arvados = arvados_client
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
+ self.arvados_node = self._clean_arvados_node(
+ node, "Prepared by Node Manager")
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._retry()
self._finished()
def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
+ if self.cloud_node is not None:
+ return False
+ self.stop()
+ return True
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
This actor simply destroys a cloud node, retrying as needed.
"""
- def __init__(self, timer_actor, cloud_client, node_monitor,
+ def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
cancellable=True, retry_wait=1, max_retry_wait=180):
# If a ShutdownActor is cancellable, it will ask the
# ComputeNodeMonitorActor if it's still eligible before taking each
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', cloud_client, timer_actor,
+ 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
def on_start(self):
self._later.shutdown_node()
+ def _arvados_node(self):
+ return self._monitor.arvados_node.get()
+
+ def _finished(self, success_flag=None):
+ if success_flag is not None:
+ self.success = success_flag
+ return super(ComputeNodeShutdownActor, self)._finished()
+
def cancel_shutdown(self):
- self.success = False
- self._finished()
+ self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
@_stop_if_window_closed
@ComputeNodeStateChangeBase._retry()
def shutdown_node(self):
- if self._cloud.destroy_node(self.cloud_node):
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self.success = True
- self._finished()
- else:
+ if not self._cloud.destroy_node(self.cloud_node):
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ arv_node = self._arvados_node()
+ if arv_node is None:
+ self._finished(success_flag=True)
+ else:
+ self._later.clean_arvados_node(arv_node)
+
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ def clean_arvados_node(self, arvados_node):
+ self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
+ self._finished(success_flag=True)
# Make the decorator available to subclasses.
_stop_if_window_closed = staticmethod(_stop_if_window_closed)