X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f41ecc930361f296938308e46748c92407ecd812..5bcba288077488791daa43a15d5fd5fb0c6e653c:/services/nodemanager/arvnodeman/clientactor.py diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py index 77d85d640c..6319f4bbfc 100644 --- a/services/nodemanager/arvnodeman/clientactor.py +++ b/services/nodemanager/arvnodeman/clientactor.py @@ -30,9 +30,9 @@ class RemotePollLoopActor(actor_class): response to subscribers. It takes care of error handling, and retrying requests with exponential backoff. - To use this actor, define CLIENT_ERRORS and the _send_request method. - If you also define an _item_key method, this class will support - subscribing to a specific item by key in responses. + To use this actor, define the _send_request method. If you also + define an _item_key method, this class will support subscribing to + a specific item by key in responses. """ def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180): super(RemotePollLoopActor, self).__init__() @@ -40,18 +40,19 @@ class RemotePollLoopActor(actor_class): self._timer = timer_actor self._logger = logging.getLogger(self.LOGGER_NAME) self._later = self.actor_ref.proxy() + self._polling_started = False + self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self)) self.min_poll_wait = poll_wait self.max_poll_wait = max_poll_wait self.poll_wait = self.min_poll_wait - self.last_poll_time = None self.all_subscribers = set() self.key_subscribers = {} if hasattr(self, '_item_key'): self.subscribe_to = self._subscribe_to def _start_polling(self): - if self.last_poll_time is None: - self.last_poll_time = time.time() + if not self._polling_started: + self._polling_started = True self._later.poll() def subscribe(self, subscriber): @@ -70,6 +71,8 @@ class RemotePollLoopActor(actor_class): raise NotImplementedError("subclasses must implement request method") def _got_response(self, response): + self._logger.debug("%s got response with %d items", + self.log_prefix, len(response)) self.poll_wait = self.min_poll_wait _notify_subscribers(response, self.all_subscribers) if hasattr(self, '_item_key'): @@ -79,18 +82,30 @@ class RemotePollLoopActor(actor_class): def _got_error(self, error): self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait) - self._logger.warning("Client error: %s - waiting %s seconds", - error, self.poll_wait) + return "{} got error: {} - waiting {} seconds".format( + self.log_prefix, error, self.poll_wait) - def poll(self): + def is_common_error(self, exception): + return False + + def poll(self, scheduled_start=None): + self._logger.debug("%s sending poll", self.log_prefix) start_time = time.time() + if scheduled_start is None: + scheduled_start = start_time try: response = self._send_request() - except self.CLIENT_ERRORS as error: - self.last_poll_time = start_time - self._got_error(error) + except Exception as error: + errmsg = self._got_error(error) + if self.is_common_error(error): + self._logger.warning(errmsg) + else: + self._logger.exception(errmsg) + next_poll = start_time + self.poll_wait else: - self.last_poll_time += self.poll_wait self._got_response(response) - self._timer.schedule(self.last_poll_time + self.poll_wait, - self._later.poll) + next_poll = scheduled_start + self.poll_wait + end_time = time.time() + if next_poll < end_time: # We've drifted too much; start fresh. + next_poll = end_time + self.poll_wait + self._timer.schedule(next_poll, self._later.poll, next_poll)