2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
13 from .config import actor_class
15 def _notify_subscribers(response, subscribers):
16 """Send the response to all the subscriber methods.
18 If any of the subscriber actors have stopped, remove them from the
21 dead_subscribers = set()
22 for subscriber in subscribers:
25 except pykka.ActorDeadError:
26 dead_subscribers.add(subscriber)
27 subscribers.difference_update(dead_subscribers)
29 class RemotePollLoopActor(actor_class):
30 """Abstract actor class to regularly poll a remote service.
32 This actor sends regular requests to a remote service, and sends each
33 response to subscribers. It takes care of error handling, and retrying
34 requests with exponential backoff.
36 To use this actor, define the _send_request method. If you also
37 define an _item_key method, this class will support subscribing to
38 a specific item by key in responses.
40 def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
41 super(RemotePollLoopActor, self).__init__()
43 self._timer = timer_actor
44 self._later = self.actor_ref.tell_proxy()
45 self._polling_started = False
46 self.min_poll_wait = poll_wait
47 self.max_poll_wait = max_poll_wait
48 self.poll_wait = self.min_poll_wait
49 self.all_subscribers = set()
50 self.key_subscribers = {}
51 if hasattr(self, '_item_key'):
52 self.subscribe_to = self._subscribe_to
55 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
57 def _start_polling(self):
58 if not self._polling_started:
59 self._polling_started = True
62 def subscribe(self, subscriber):
63 self.all_subscribers.add(subscriber)
64 self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
67 # __init__ exposes this method to the proxy if the subclass defines
69 def _subscribe_to(self, key, subscriber):
70 self.key_subscribers.setdefault(key, set()).add(subscriber)
71 self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
74 def _send_request(self):
75 raise NotImplementedError("subclasses must implement request method")
77 def _got_response(self, response):
78 self.poll_wait = self.min_poll_wait
79 _notify_subscribers(response, self.all_subscribers)
80 if hasattr(self, '_item_key'):
81 items = {self._item_key(x): x for x in response}
82 for key, subscribers in self.key_subscribers.iteritems():
83 _notify_subscribers(items.get(key), subscribers)
85 def _got_error(self, error):
86 self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
87 return "got error: {} - will try again in {} seconds".format(
88 error, self.poll_wait)
90 def is_common_error(self, exception):
93 def poll(self, scheduled_start=None):
94 self._logger.debug("sending request")
95 start_time = time.time()
96 if scheduled_start is None:
97 scheduled_start = start_time
99 response = self._send_request()
100 except Exception as error:
101 errmsg = self._got_error(error)
102 if self.is_common_error(error):
103 self._logger.warning(errmsg)
105 self._logger.exception(errmsg)
106 next_poll = start_time + self.poll_wait
108 self._got_response(response)
109 next_poll = scheduled_start + self.poll_wait
110 self._logger.info("got response with %d items in %s seconds, next poll at %s",
111 len(response), (time.time() - scheduled_start),
112 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
113 end_time = time.time()
114 if next_poll < end_time: # We've drifted too much; start fresh.
115 next_poll = end_time + self.poll_wait
116 self._timer.schedule(next_poll, self._later.poll, next_poll)