Merge branch '9120-node-manager-search-ex-methods-wip'
[arvados.git] / services / nodemanager / arvnodeman / clientactor.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import time
7
8 import pykka
9
10 from .config import actor_class
11
12 def _notify_subscribers(response, subscribers):
13     """Send the response to all the subscriber methods.
14
15     If any of the subscriber actors have stopped, remove them from the
16     subscriber set.
17     """
18     dead_subscribers = set()
19     for subscriber in subscribers:
20         try:
21             subscriber(response)
22         except pykka.ActorDeadError:
23             dead_subscribers.add(subscriber)
24     subscribers.difference_update(dead_subscribers)
25
26 class RemotePollLoopActor(actor_class):
27     """Abstract actor class to regularly poll a remote service.
28
29     This actor sends regular requests to a remote service, and sends each
30     response to subscribers.  It takes care of error handling, and retrying
31     requests with exponential backoff.
32
33     To use this actor, define the _send_request method.  If you also
34     define an _item_key method, this class will support subscribing to
35     a specific item by key in responses.
36     """
37     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
38         super(RemotePollLoopActor, self).__init__()
39         self._client = client
40         self._timer = timer_actor
41         self._later = self.actor_ref.tell_proxy()
42         self._polling_started = False
43         self.min_poll_wait = poll_wait
44         self.max_poll_wait = max_poll_wait
45         self.poll_wait = self.min_poll_wait
46         self.all_subscribers = set()
47         self.key_subscribers = {}
48         if hasattr(self, '_item_key'):
49             self.subscribe_to = self._subscribe_to
50
51     def on_start(self):
52         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
53
54     def _start_polling(self):
55         if not self._polling_started:
56             self._polling_started = True
57             self._later.poll()
58
59     def subscribe(self, subscriber):
60         self.all_subscribers.add(subscriber)
61         self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
62         self._start_polling()
63
64     # __init__ exposes this method to the proxy if the subclass defines
65     # _item_key.
66     def _subscribe_to(self, key, subscriber):
67         self.key_subscribers.setdefault(key, set()).add(subscriber)
68         self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
69         self._start_polling()
70
71     def _send_request(self):
72         raise NotImplementedError("subclasses must implement request method")
73
74     def _got_response(self, response):
75         self.poll_wait = self.min_poll_wait
76         _notify_subscribers(response, self.all_subscribers)
77         if hasattr(self, '_item_key'):
78             items = {self._item_key(x): x for x in response}
79             for key, subscribers in self.key_subscribers.iteritems():
80                 _notify_subscribers(items.get(key), subscribers)
81
82     def _got_error(self, error):
83         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
84         return "got error: {} - will try again in {} seconds".format(
85             error, self.poll_wait)
86
87     def is_common_error(self, exception):
88         return False
89
90     def poll(self, scheduled_start=None):
91         self._logger.debug("sending request")
92         start_time = time.time()
93         if scheduled_start is None:
94             scheduled_start = start_time
95         try:
96             response = self._send_request()
97         except Exception as error:
98             errmsg = self._got_error(error)
99             if self.is_common_error(error):
100                 self._logger.warning(errmsg)
101             else:
102                 self._logger.exception(errmsg)
103             next_poll = start_time + self.poll_wait
104         else:
105             self._got_response(response)
106             next_poll = scheduled_start + self.poll_wait
107             self._logger.info("got response with %d items in %s seconds, next poll at %s",
108                               len(response), (time.time() - scheduled_start),
109                               time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
110         end_time = time.time()
111         if next_poll < end_time:  # We've drifted too much; start fresh.
112             next_poll = end_time + self.poll_wait
113         self._timer.schedule(next_poll, self._later.poll, next_poll)