4434: Merge branch 'master' into 4434-collation
[arvados.git] / services / nodemanager / arvnodeman / clientactor.py
index 77d85d640ca68ac61cfde028ac37dd82ea7c88bb..46a103eb02985a7ba9e24af464e96e0cf6dd5a7b 100644 (file)
@@ -34,24 +34,27 @@ class RemotePollLoopActor(actor_class):
     If you also define an _item_key method, this class will support
     subscribing to a specific item by key in responses.
     """
+    CLIENT_ERRORS = ()
+
     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
         self._logger = logging.getLogger(self.LOGGER_NAME)
         self._later = self.actor_ref.proxy()
+        self._polling_started = False
+        self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
         self.poll_wait = self.min_poll_wait
-        self.last_poll_time = None
         self.all_subscribers = set()
         self.key_subscribers = {}
         if hasattr(self, '_item_key'):
             self.subscribe_to = self._subscribe_to
 
     def _start_polling(self):
-        if self.last_poll_time is None:
-            self.last_poll_time = time.time()
+        if not self._polling_started:
+            self._polling_started = True
             self._later.poll()
 
     def subscribe(self, subscriber):
@@ -70,6 +73,8 @@ class RemotePollLoopActor(actor_class):
         raise NotImplementedError("subclasses must implement request method")
 
     def _got_response(self, response):
+        self._logger.debug("%s got response with %d items",
+                           self.log_prefix, len(response))
         self.poll_wait = self.min_poll_wait
         _notify_subscribers(response, self.all_subscribers)
         if hasattr(self, '_item_key'):
@@ -79,18 +84,27 @@ class RemotePollLoopActor(actor_class):
 
     def _got_error(self, error):
         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
-        self._logger.warning("Client error: %s - waiting %s seconds",
-                             error, self.poll_wait)
+        return "{} got error: {} - waiting {} seconds".format(
+            self.log_prefix, error, self.poll_wait)
 
-    def poll(self):
+    def poll(self, scheduled_start=None):
+        self._logger.debug("%s sending poll", self.log_prefix)
         start_time = time.time()
+        if scheduled_start is None:
+            scheduled_start = start_time
         try:
             response = self._send_request()
-        except self.CLIENT_ERRORS as error:
-            self.last_poll_time = start_time
-            self._got_error(error)
+        except Exception as error:
+            errmsg = self._got_error(error)
+            if isinstance(error, self.CLIENT_ERRORS):
+                self._logger.warning(errmsg)
+            else:
+                self._logger.exception(errmsg)
+            next_poll = start_time + self.poll_wait
         else:
-            self.last_poll_time += self.poll_wait
             self._got_response(response)
-        self._timer.schedule(self.last_poll_time + self.poll_wait,
-                             self._later.poll)
+            next_poll = scheduled_start + self.poll_wait
+        end_time = time.time()
+        if next_poll < end_time:  # We've drifted too much; start fresh.
+            next_poll = end_time + self.poll_wait
+        self._timer.schedule(next_poll, self._later.poll, next_poll)