X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8709ef45c968e79b5e5978759484b994d4789aea..debf08b0415cf0f9d35338cfb280bde8628619eb:/services/nodemanager/arvnodeman/clientactor.py diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py deleted file mode 100644 index e5534c53f7..0000000000 --- a/services/nodemanager/arvnodeman/clientactor.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import, print_function - -import logging -import time - -import pykka - -from .config import actor_class - -def _notify_subscribers(response, subscribers): - """Send the response to all the subscriber methods. - - If any of the subscriber actors have stopped, remove them from the - subscriber set. - """ - dead_subscribers = set() - for subscriber in subscribers: - try: - subscriber(response) - except pykka.ActorDeadError: - dead_subscribers.add(subscriber) - subscribers.difference_update(dead_subscribers) - -class RemotePollLoopActor(actor_class): - """Abstract actor class to regularly poll a remote service. - - This actor sends regular requests to a remote service, and sends each - response to subscribers. It takes care of error handling, and retrying - requests with exponential backoff. - - To use this actor, define the _send_request method. If you also - define an _item_key method, this class will support subscribing to - a specific item by key in responses. - """ - def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180): - super(RemotePollLoopActor, self).__init__() - self._client = client - self._timer = timer_actor - self._later = self.actor_ref.proxy() - self._polling_started = False - self.min_poll_wait = poll_wait - self.max_poll_wait = max_poll_wait - self.poll_wait = self.min_poll_wait - self.all_subscribers = set() - self.key_subscribers = {} - if hasattr(self, '_item_key'): - self.subscribe_to = self._subscribe_to - - def on_start(self): - self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:]))) - - def _start_polling(self): - if not self._polling_started: - self._polling_started = True - self._later.poll() - - def subscribe(self, subscriber): - self.all_subscribers.add(subscriber) - self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn) - self._start_polling() - - # __init__ exposes this method to the proxy if the subclass defines - # _item_key. - def _subscribe_to(self, key, subscriber): - self.key_subscribers.setdefault(key, set()).add(subscriber) - self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key) - self._start_polling() - - def _send_request(self): - raise NotImplementedError("subclasses must implement request method") - - def _got_response(self, response): - self._logger.debug("got response with %d items", len(response)) - self.poll_wait = self.min_poll_wait - _notify_subscribers(response, self.all_subscribers) - if hasattr(self, '_item_key'): - items = {self._item_key(x): x for x in response} - for key, subscribers in self.key_subscribers.iteritems(): - _notify_subscribers(items.get(key), subscribers) - - def _got_error(self, error): - self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait) - return "got error: {} - will try again in {} seconds".format( - error, self.poll_wait) - - def is_common_error(self, exception): - return False - - def poll(self, scheduled_start=None): - self._logger.debug("sending request") - start_time = time.time() - if scheduled_start is None: - scheduled_start = start_time - try: - response = self._send_request() - except Exception as error: - errmsg = self._got_error(error) - if self.is_common_error(error): - self._logger.warning(errmsg) - else: - self._logger.exception(errmsg) - next_poll = start_time + self.poll_wait - else: - self._got_response(response) - next_poll = scheduled_start + self.poll_wait - self._logger.info("request took %s seconds", (time.time() - scheduled_start)) - end_time = time.time() - if next_poll < end_time: # We've drifted too much; start fresh. - next_poll = end_time + self.poll_wait - self._timer.schedule(next_poll, self._later.poll, next_poll)