X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e9901a3f94eab93f9482b7cb3d7e2c1b50216f08..5768e5a7559c22f86d89be65245b0d269f06cc6c:/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py index 45468c31d7..4848289e8b 100644 --- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py @@ -24,30 +24,29 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin): def __init__(self, cloud_client, arvados_client, timer_actor, retry_wait, max_retry_wait): super(ComputeNodeStateChangeBase, self).__init__() - self._later = self.actor_ref.proxy() + RetryMixin.__init__(self, retry_wait, max_retry_wait, + None, cloud_client, timer_actor) + self._later = self.actor_ref.tell_proxy() self._arvados = arvados_client self.subscribers = set() def _set_logger(self): - self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[9:])) + self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:])) def on_start(self): self._set_logger() - RetryMixin.__init__(self, - retry_wait, - max_retry_wait, - self._logger, - cloud_client, - timer_actor) def _finished(self): - _notify_subscribers(self._later, self.subscribers) + if self.subscribers is None: + raise Exception("Actor tried to finish twice") + _notify_subscribers(self.actor_ref.proxy(), self.subscribers) self.subscribers = None + self._logger.info("finished") def subscribe(self, subscriber): if self.subscribers is None: try: - subscriber(self._later) + subscriber(self.actor_ref.proxy()) except pykka.ActorDeadError: pass else: @@ -66,6 +65,17 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin): 'last_action': explanation}}, ).execute() + @staticmethod + def _finish_on_exception(orig_func): + @functools.wraps(orig_func) + def finish_wrapper(self, *args, **kwargs): + try: + return orig_func(self, *args, **kwargs) + except Exception as error: + self._logger.error("Actor error %s", error) + self._finished() + return finish_wrapper + class ComputeNodeSetupActor(ComputeNodeStateChangeBase): """Actor to create and set up a cloud compute node. @@ -90,20 +100,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase): else: self._later.prepare_arvados_node(arvados_node) + @ComputeNodeStateChangeBase._finish_on_exception @RetryMixin._retry(config.ARVADOS_ERRORS) def create_arvados_node(self): self.arvados_node = self._arvados.nodes().create(body={}).execute() self._later.create_cloud_node() + @ComputeNodeStateChangeBase._finish_on_exception @RetryMixin._retry(config.ARVADOS_ERRORS) def prepare_arvados_node(self, node): self.arvados_node = self._clean_arvados_node( node, "Prepared by Node Manager") self._later.create_cloud_node() + @ComputeNodeStateChangeBase._finish_on_exception @RetryMixin._retry() def create_cloud_node(self): - self._logger.info("Creating cloud node of size %s.", + self._logger.info("Sending create_node request for node size %s.", self.cloud_size.name) self.cloud_node = self._cloud.create_node(self.cloud_size, self.arvados_node) @@ -112,6 +125,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase): self._logger.info("Cloud node %s created.", self.cloud_node.id) self._later.update_arvados_node_properties() + @ComputeNodeStateChangeBase._finish_on_exception @RetryMixin._retry(config.ARVADOS_ERRORS) def update_arvados_node_properties(self): """Tell Arvados some details about the cloud node. @@ -175,7 +189,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): self.success = None def _set_logger(self): - self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[9:], self.cloud_node.id)) + self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name)) def on_start(self): super(ComputeNodeShutdownActor, self).on_start() @@ -191,38 +205,40 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): def cancel_shutdown(self, reason): self.cancel_reason = reason - self._logger.info("Shutdown cancelled: %s.", - self.cloud_node.id, reason) + self._logger.info("Shutdown cancelled: %s.", reason) self._finished(success_flag=False) def _stop_if_window_closed(orig_func): @functools.wraps(orig_func) def stop_wrapper(self, *args, **kwargs): if (self.cancellable and - (not self._monitor.shutdown_eligible().get())): + (self._monitor.shutdown_eligible().get() is not True)): self._later.cancel_shutdown(self.WINDOW_CLOSED) return None else: return orig_func(self, *args, **kwargs) return stop_wrapper + @ComputeNodeStateChangeBase._finish_on_exception @_stop_if_window_closed @RetryMixin._retry() def shutdown_node(self): - self._logger.info("Starting shutdown", self.cloud_node.id) + self._logger.info("Starting shutdown") if not self._cloud.destroy_node(self.cloud_node): if self._cloud.broken(self.cloud_node): self._later.cancel_shutdown(self.NODE_BROKEN) + return else: # Force a retry. raise cloud_types.LibcloudError("destroy_node failed") - self._logger.info("Shutdown success", self.cloud_node.id) + self._logger.info("Shutdown success") arv_node = self._arvados_node() if arv_node is None: self._finished(success_flag=True) else: self._later.clean_arvados_node(arv_node) + @ComputeNodeStateChangeBase._finish_on_exception @RetryMixin._retry(config.ARVADOS_ERRORS) def clean_arvados_node(self, arvados_node): self._clean_arvados_node(arvados_node, "Shut down by Node Manager") @@ -291,7 +307,7 @@ class ComputeNodeMonitorActor(config.actor_class): boot_fail_after=1800 ): super(ComputeNodeMonitorActor, self).__init__() - self._later = self.actor_ref.proxy() + self._later = self.actor_ref.tell_proxy() self._last_log = None self._shutdowns = shutdown_timer self._cloud_node_fqdn = cloud_fqdn_func @@ -310,10 +326,11 @@ class ComputeNodeMonitorActor(config.actor_class): self._later.consider_shutdown() def _set_logger(self): - self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[9:], self.cloud_node.name)) + self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name)) def on_start(self): self._set_logger() + self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown) def subscribe(self, subscriber): self.subscribers.add(subscriber) @@ -341,43 +358,55 @@ class ComputeNodeMonitorActor(config.actor_class): return result def shutdown_eligible(self): + """Return True if eligible for shutdown, or a string explaining why the node + is not eligible for shutdown.""" + if not self._shutdowns.window_open(): - return False + return "shutdown window is not open." if self.arvados_node is None: # Node is unpaired. # If it hasn't pinged Arvados after boot_fail seconds, shut it down - return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after) + if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after): + return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after)) + else: + return True missing = arvados_node_missing(self.arvados_node, self.node_stale_after) if missing and self._cloud.broken(self.cloud_node): # Node is paired, but Arvados says it is missing and the cloud says the node # is in an error state, so shut it down. return True if missing is None and self._cloud.broken(self.cloud_node): - self._logger.warning( - "cloud reports broken node, but paired node %s never pinged " - "(bug?) -- skipped check for node_stale_after", + self._logger.info( + "Cloud node considered 'broken' but paired node %s last_ping_at is None, " + + "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).", self.arvados_node['uuid']) - return self.in_state('idle') + if self.in_state('idle'): + return True + else: + return "node is not idle." def consider_shutdown(self): - next_opening = self._shutdowns.next_opening() - if self.shutdown_eligible(): - self._debug("Suggesting shutdown.", self.cloud_node.id) - _notify_subscribers(self._later, self.subscribers) - elif self._shutdowns.window_open(): - self._debug("Shutdown window open but node busy.", - self.cloud_node.id) - elif self.last_shutdown_opening != next_opening: - self._debug("Shutdown window closed. Next at %s.", - self.cloud_node.id, time.ctime(next_opening)) - self._timer.schedule(next_opening, self._later.consider_shutdown) - self.last_shutdown_opening = next_opening + try: + next_opening = self._shutdowns.next_opening() + eligible = self.shutdown_eligible() + if eligible is True: + self._debug("Suggesting shutdown.") + _notify_subscribers(self.actor_ref.proxy(), self.subscribers) + elif self._shutdowns.window_open(): + self._debug("Cannot shut down because %s", eligible) + elif self.last_shutdown_opening != next_opening: + self._debug("Shutdown window closed. Next at %s.", + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening))) + self._timer.schedule(next_opening, self._later.consider_shutdown) + self.last_shutdown_opening = next_opening + except Exception: + self._logger.exception("Unexpected exception") def offer_arvados_pair(self, arvados_node): first_ping_s = arvados_node.get('first_ping_at') if (self.arvados_node is not None) or (not first_ping_s): return None - elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and + elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)): self._later.update_arvados_node(arvados_node) return self.cloud_node.id