X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a87af13d2d3461470b89b0811629a077a70c1938..cafe405682669b7ebdec8db4ee083c3ca2761827:/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py index e11dcc77ba..2ddfb0a007 100644 --- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py @@ -39,14 +39,14 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin): def _finished(self): if self.subscribers is None: raise Exception("Actor tried to finish twice") - _notify_subscribers(self._later, self.subscribers) + _notify_subscribers(self.actor_ref.proxy(), self.subscribers) self.subscribers = None self._logger.info("finished") def subscribe(self, subscriber): if self.subscribers is None: try: - subscriber(self._later) + subscriber(self.actor_ref.proxy()) except pykka.ActorDeadError: pass else: @@ -256,12 +256,6 @@ class ComputeNodeUpdateActor(config.actor_class): this to perform maintenance tasks on themselves. Having a dedicated actor for this gives us the opportunity to control the flow of requests; e.g., by backing off when errors occur. - - This actor is most like a "traditional" Pykka actor: there's no - subscribing, but instead methods return real driver results. If - you're interested in those results, you should get them from the - Future that the proxy method returns. Be prepared to handle exceptions - from the cloud driver when you do. """ def __init__(self, cloud_factory, max_retry_wait=180): super(ComputeNodeUpdateActor, self).__init__() @@ -270,6 +264,12 @@ class ComputeNodeUpdateActor(config.actor_class): self.error_streak = 0 self.next_request_time = time.time() + def _set_logger(self): + self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:])) + + def on_start(self): + self._set_logger() + def _throttle_errors(orig_func): @functools.wraps(orig_func) def throttle_wrapper(self, *args, **kwargs): @@ -280,10 +280,12 @@ class ComputeNodeUpdateActor(config.actor_class): try: result = orig_func(self, *args, **kwargs) except Exception as error: - self.error_streak += 1 - self.next_request_time += min(2 ** self.error_streak, - self.max_retry_wait) - raise + if self._cloud.is_cloud_exception(error): + self.error_streak += 1 + self.next_request_time += min(2 ** self.error_streak, + self.max_retry_wait) + self._logger.warn( + "Unhandled exception: %s", error, exc_info=error) else: self.error_streak = 0 return result @@ -391,7 +393,7 @@ class ComputeNodeMonitorActor(config.actor_class): eligible = self.shutdown_eligible() if eligible is True: self._debug("Suggesting shutdown.") - _notify_subscribers(self._later, self.subscribers) + _notify_subscribers(self.actor_ref.proxy(), self.subscribers) elif self._shutdowns.window_open(): self._debug("Cannot shut down because %s", eligible) elif self.last_shutdown_opening != next_opening: @@ -406,7 +408,7 @@ class ComputeNodeMonitorActor(config.actor_class): first_ping_s = arvados_node.get('first_ping_at') if (self.arvados_node is not None) or (not first_ping_s): return None - elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and + elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)): self._later.update_arvados_node(arvados_node) return self.cloud_node.id