4139: Improve logging in Node Manager poll actors.
[arvados.git] / services / nodemanager / arvnodeman / clientactor.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import time
7
8 import pykka
9
10 from .config import actor_class
11
12 def _notify_subscribers(response, subscribers):
13     """Send the response to all the subscriber methods.
14
15     If any of the subscriber actors have stopped, remove them from the
16     subscriber set.
17     """
18     dead_subscribers = set()
19     for subscriber in subscribers:
20         try:
21             subscriber(response)
22         except pykka.ActorDeadError:
23             dead_subscribers.add(subscriber)
24     subscribers.difference_update(dead_subscribers)
25
26 class RemotePollLoopActor(actor_class):
27     """Abstract actor class to regularly poll a remote service.
28
29     This actor sends regular requests to a remote service, and sends each
30     response to subscribers.  It takes care of error handling, and retrying
31     requests with exponential backoff.
32
33     To use this actor, define CLIENT_ERRORS and the _send_request method.
34     If you also define an _item_key method, this class will support
35     subscribing to a specific item by key in responses.
36     """
37     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
38         super(RemotePollLoopActor, self).__init__()
39         self._client = client
40         self._timer = timer_actor
41         self._logger = logging.getLogger(self.LOGGER_NAME)
42         self._later = self.actor_ref.proxy()
43         self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
44         self.min_poll_wait = poll_wait
45         self.max_poll_wait = max_poll_wait
46         self.poll_wait = self.min_poll_wait
47         self.last_poll_time = None
48         self.all_subscribers = set()
49         self.key_subscribers = {}
50         if hasattr(self, '_item_key'):
51             self.subscribe_to = self._subscribe_to
52
53     def _start_polling(self):
54         if self.last_poll_time is None:
55             self.last_poll_time = time.time()
56             self._later.poll()
57
58     def subscribe(self, subscriber):
59         self.all_subscribers.add(subscriber)
60         self._logger.debug("%r subscribed to all events", subscriber)
61         self._start_polling()
62
63     # __init__ exposes this method to the proxy if the subclass defines
64     # _item_key.
65     def _subscribe_to(self, key, subscriber):
66         self.key_subscribers.setdefault(key, set()).add(subscriber)
67         self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
68         self._start_polling()
69
70     def _send_request(self):
71         raise NotImplementedError("subclasses must implement request method")
72
73     def _got_response(self, response):
74         self._logger.debug("%s got response with %d items",
75                            self.log_prefix, len(response))
76         self.poll_wait = self.min_poll_wait
77         _notify_subscribers(response, self.all_subscribers)
78         if hasattr(self, '_item_key'):
79             items = {self._item_key(x): x for x in response}
80             for key, subscribers in self.key_subscribers.iteritems():
81                 _notify_subscribers(items.get(key), subscribers)
82
83     def _got_error(self, error):
84         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
85         self._logger.warning("%s got error: %s - waiting %s seconds",
86                              self.log_prefix, error, self.poll_wait)
87
88     def poll(self):
89         self._logger.debug("%s sending poll", self.log_prefix)
90         start_time = time.time()
91         try:
92             response = self._send_request()
93         except self.CLIENT_ERRORS as error:
94             self.last_poll_time = start_time
95             self._got_error(error)
96         else:
97             self.last_poll_time += self.poll_wait
98             self._got_response(response)
99         self._timer.schedule(self.last_poll_time + self.poll_wait,
100                              self._later.poll)