Merge branch '13301-cwl-resource-scatter' closes #13301
[arvados.git] / services / nodemanager / arvnodeman / clientactor.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import time
10
11 import pykka
12
13 from .config import actor_class
14
15 def _notify_subscribers(response, subscribers):
16     """Send the response to all the subscriber methods.
17
18     If any of the subscriber actors have stopped, remove them from the
19     subscriber set.
20     """
21     dead_subscribers = set()
22     for subscriber in subscribers:
23         try:
24             subscriber(response)
25         except pykka.ActorDeadError:
26             dead_subscribers.add(subscriber)
27     subscribers.difference_update(dead_subscribers)
28
29 class RemotePollLoopActor(actor_class):
30     """Abstract actor class to regularly poll a remote service.
31
32     This actor sends regular requests to a remote service, and sends each
33     response to subscribers.  It takes care of error handling, and retrying
34     requests with exponential backoff.
35
36     To use this actor, define the _send_request method.  If you also
37     define an _item_key method, this class will support subscribing to
38     a specific item by key in responses.
39     """
40     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
41         super(RemotePollLoopActor, self).__init__()
42         self._client = client
43         self._timer = timer_actor
44         self._later = self.actor_ref.tell_proxy()
45         self._polling_started = False
46         self.min_poll_wait = poll_wait
47         self.max_poll_wait = max_poll_wait
48         self.poll_wait = self.min_poll_wait
49         self.all_subscribers = set()
50         self.key_subscribers = {}
51         if hasattr(self, '_item_key'):
52             self.subscribe_to = self._subscribe_to
53
54     def on_start(self):
55         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
56
57     def _start_polling(self):
58         if not self._polling_started:
59             self._polling_started = True
60             self._later.poll()
61
62     def subscribe(self, subscriber):
63         self.all_subscribers.add(subscriber)
64         self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
65         self._start_polling()
66
67     # __init__ exposes this method to the proxy if the subclass defines
68     # _item_key.
69     def _subscribe_to(self, key, subscriber):
70         self.key_subscribers.setdefault(key, set()).add(subscriber)
71         self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
72         self._start_polling()
73
74     def _send_request(self):
75         raise NotImplementedError("subclasses must implement request method")
76
77     def _got_response(self, response):
78         self.poll_wait = self.min_poll_wait
79         _notify_subscribers(response, self.all_subscribers)
80         if hasattr(self, '_item_key'):
81             items = {self._item_key(x): x for x in response}
82             for key, subscribers in self.key_subscribers.iteritems():
83                 _notify_subscribers(items.get(key), subscribers)
84
85     def _got_error(self, error):
86         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
87         return "got error: {} - will try again in {} seconds".format(
88             error, self.poll_wait)
89
90     def is_common_error(self, exception):
91         return False
92
93     def poll(self, scheduled_start=None):
94         self._logger.debug("sending request")
95         start_time = time.time()
96         if scheduled_start is None:
97             scheduled_start = start_time
98         try:
99             response = self._send_request()
100         except Exception as error:
101             errmsg = self._got_error(error)
102             if self.is_common_error(error):
103                 self._logger.warning(errmsg)
104             else:
105                 self._logger.exception(errmsg)
106             next_poll = start_time + self.poll_wait
107         else:
108             self._got_response(response)
109             next_poll = scheduled_start + self.poll_wait
110             self._logger.info("got response with %d items in %s seconds, next poll at %s",
111                               len(response), (time.time() - scheduled_start),
112                               time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
113         end_time = time.time()
114         if next_poll < end_time:  # We've drifted too much; start fresh.
115             next_poll = end_time + self.poll_wait
116         self._timer.schedule(next_poll, self._later.poll, next_poll)