#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
from __future__ import absolute_import, print_function
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._polling_started = False
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
- self._logger.debug("%r subscribed to all events", subscriber)
+ self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
- self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
- self._logger.debug("got response with %d items", len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- return "{} got error: {} - waiting {} seconds".format(
- self.log_prefix, error, self.poll_wait)
+ return "got error: {} - will try again in {} seconds".format(
+ error, self.poll_wait)
def is_common_error(self, exception):
return False
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
+ self._logger.info("got response with %d items in %s seconds, next poll at %s",
+ len(response), (time.time() - scheduled_start),
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait