3 from __future__ import absolute_import, print_function
10 from .config import actor_class
12 def _notify_subscribers(response, subscribers):
13 """Send the response to all the subscriber methods.
15 If any of the subscriber actors have stopped, remove them from the
18 dead_subscribers = set()
19 for subscriber in subscribers:
22 except pykka.ActorDeadError:
23 dead_subscribers.add(subscriber)
24 subscribers.difference_update(dead_subscribers)
26 class RemotePollLoopActor(actor_class):
27 """Abstract actor class to regularly poll a remote service.
29 This actor sends regular requests to a remote service, and sends each
30 response to subscribers. It takes care of error handling, and retrying
31 requests with exponential backoff.
33 To use this actor, define CLIENT_ERRORS and the _send_request method.
34 If you also define an _item_key method, this class will support
35 subscribing to a specific item by key in responses.
37 def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
38 super(RemotePollLoopActor, self).__init__()
40 self._timer = timer_actor
41 self._logger = logging.getLogger(self.LOGGER_NAME)
42 self._later = self.actor_ref.proxy()
43 self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
44 self.min_poll_wait = poll_wait
45 self.max_poll_wait = max_poll_wait
46 self.poll_wait = self.min_poll_wait
47 self.last_poll_time = None
48 self.all_subscribers = set()
49 self.key_subscribers = {}
50 if hasattr(self, '_item_key'):
51 self.subscribe_to = self._subscribe_to
53 def _start_polling(self):
54 if self.last_poll_time is None:
55 self.last_poll_time = time.time()
58 def subscribe(self, subscriber):
59 self.all_subscribers.add(subscriber)
60 self._logger.debug("%r subscribed to all events", subscriber)
63 # __init__ exposes this method to the proxy if the subclass defines
65 def _subscribe_to(self, key, subscriber):
66 self.key_subscribers.setdefault(key, set()).add(subscriber)
67 self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
70 def _send_request(self):
71 raise NotImplementedError("subclasses must implement request method")
73 def _got_response(self, response):
74 self._logger.debug("%s got response with %d items",
75 self.log_prefix, len(response))
76 self.poll_wait = self.min_poll_wait
77 _notify_subscribers(response, self.all_subscribers)
78 if hasattr(self, '_item_key'):
79 items = {self._item_key(x): x for x in response}
80 for key, subscribers in self.key_subscribers.iteritems():
81 _notify_subscribers(items.get(key), subscribers)
83 def _got_error(self, error):
84 self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
85 self._logger.warning("%s got error: %s - waiting %s seconds",
86 self.log_prefix, error, self.poll_wait)
89 self._logger.debug("%s sending poll", self.log_prefix)
90 start_time = time.time()
92 response = self._send_request()
93 except self.CLIENT_ERRORS as error:
94 self.last_poll_time = start_time
95 self._got_error(error)
97 self.last_poll_time += self.poll_wait
98 self._got_response(response)
99 self._timer.schedule(self.last_poll_time + self.poll_wait,