def __init__(self, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
- self._later = self.actor_ref.proxy()
+ RetryMixin.__init__(self, retry_wait, max_retry_wait,
+ None, cloud_client, timer_actor)
+ self._later = self.actor_ref.tell_proxy()
self._arvados = arvados_client
self.subscribers = set()
def _set_logger(self):
- self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[9:]))
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
def on_start(self):
self._set_logger()
- RetryMixin.__init__(self,
- retry_wait,
- max_retry_wait,
- self._logger,
- cloud_client,
- timer_actor)
def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
+ if self.subscribers is None:
+ raise Exception("Actor tried to finish twice")
+ _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
self.subscribers = None
+ self._logger.info("finished")
def subscribe(self, subscriber):
if self.subscribers is None:
try:
- subscriber(self._later)
+ subscriber(self.actor_ref.proxy())
except pykka.ActorDeadError:
pass
else:
'last_action': explanation}},
).execute()
+ @staticmethod
+ def _finish_on_exception(orig_func):
+ @functools.wraps(orig_func)
+ def finish_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ self._logger.error("Actor error %s", error)
+ self._finished()
+ return finish_wrapper
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
else:
self._later.prepare_arvados_node(arvados_node)
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._clean_arvados_node(
node, "Prepared by Node Manager")
self._later.create_cloud_node()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry()
def create_cloud_node(self):
- self._logger.info("Creating cloud node of size %s.",
+ self._logger.info("Sending create_node request for node size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def update_arvados_node_properties(self):
"""Tell Arvados some details about the cloud node.
self.success = None
def _set_logger(self):
- self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[9:], self.cloud_node.id))
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
def on_start(self):
super(ComputeNodeShutdownActor, self).on_start()
def cancel_shutdown(self, reason):
self.cancel_reason = reason
- self._logger.info("Shutdown cancelled: %s.",
- self.cloud_node.id, reason)
+ self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
- (not self._monitor.shutdown_eligible().get())):
+ (self._monitor.shutdown_eligible().get() is not True)):
self._later.cancel_shutdown(self.WINDOW_CLOSED)
return None
else:
return orig_func(self, *args, **kwargs)
return stop_wrapper
+ @ComputeNodeStateChangeBase._finish_on_exception
@_stop_if_window_closed
@RetryMixin._retry()
def shutdown_node(self):
- self._logger.info("Starting shutdown", self.cloud_node.id)
+ self._logger.info("Starting shutdown")
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
+ return
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
- self._logger.info("Shutdown success", self.cloud_node.id)
+ self._logger.info("Shutdown success")
arv_node = self._arvados_node()
if arv_node is None:
self._finished(success_flag=True)
else:
self._later.clean_arvados_node(arv_node)
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def clean_arvados_node(self, arvados_node):
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
this to perform maintenance tasks on themselves. Having a
dedicated actor for this gives us the opportunity to control the
flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
"""
def __init__(self, cloud_factory, max_retry_wait=180):
super(ComputeNodeUpdateActor, self).__init__()
self.error_streak = 0
self.next_request_time = time.time()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+ def on_start(self):
+ self._set_logger()
+
def _throttle_errors(orig_func):
@functools.wraps(orig_func)
def throttle_wrapper(self, *args, **kwargs):
self.next_request_time = time.time()
try:
result = orig_func(self, *args, **kwargs)
- except Exception as error:
+ except self._cloud.CLOUD_ERRORS as error:
self.error_streak += 1
self.next_request_time += min(2 ** self.error_streak,
self.max_retry_wait)
- raise
+ self._logger.error(
+ "Caught cloud error (no retry): %s",
+ error, exc_info=error)
+ except Exception as error:
+ self._logger.error(
+ "Caught unknown error (no retry): %s",
+ error, exc_info=error)
else:
self.error_streak = 0
return result
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
self._later.consider_shutdown()
def _set_logger(self):
- self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[9:], self.cloud_node.name))
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
def on_start(self):
self._set_logger()
+ self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
def subscribe(self, subscriber):
self.subscribers.add(subscriber)
return result
def shutdown_eligible(self):
+ """Return True if eligible for shutdown, or a string explaining why the node
+ is not eligible for shutdown."""
+
if not self._shutdowns.window_open():
- return False
+ return "shutdown window is not open."
if self.arvados_node is None:
# Node is unpaired.
# If it hasn't pinged Arvados after boot_fail seconds, shut it down
- return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+ if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+ return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+ else:
+ return True
missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
if missing and self._cloud.broken(self.cloud_node):
# Node is paired, but Arvados says it is missing and the cloud says the node
# is in an error state, so shut it down.
return True
if missing is None and self._cloud.broken(self.cloud_node):
- self._logger.warning(
- "cloud reports broken node, but paired node %s never pinged "
- "(bug?) -- skipped check for node_stale_after",
+ self._logger.info(
+ "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+ "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
self.arvados_node['uuid'])
- return self.in_state('idle')
+ if self.in_state('idle'):
+ return True
+ else:
+ return "node is not idle."
def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self.shutdown_eligible():
- self._debug("Suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- elif self._shutdowns.window_open():
- self._debug("Shutdown window open but node busy.",
- self.cloud_node.id)
- elif self.last_shutdown_opening != next_opening:
- self._debug("Shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
+ try:
+ next_opening = self._shutdowns.next_opening()
+ eligible = self.shutdown_eligible()
+ if eligible is True:
+ self._debug("Suggesting shutdown.")
+ _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Cannot shut down because %s", eligible)
+ elif self.last_shutdown_opening != next_opening:
+ self._debug("Shutdown window closed. Next at %s.",
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+ except Exception:
+ self._logger.exception("Unexpected exception")
def offer_arvados_pair(self, arvados_node):
first_ping_s = arvados_node.get('first_ping_at')
if (self.arvados_node is not None) or (not first_ping_s):
return None
- elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+ elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
(arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
self._later.update_arvados_node(arvados_node)
return self.cloud_node.id