projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
12431: Use libcloud fork 2.2.2.dev1
[arvados.git]
/
services
/
nodemanager
/
arvnodeman
/
clientactor.py
diff --git
a/services/nodemanager/arvnodeman/clientactor.py
b/services/nodemanager/arvnodeman/clientactor.py
index 46a103eb02985a7ba9e24af464e96e0cf6dd5a7b..afc4f1cb5845d3e73e8d65dbb6658659594a686b 100644
(file)
--- a/
services/nodemanager/arvnodeman/clientactor.py
+++ b/
services/nodemanager/arvnodeman/clientactor.py
@@
-1,4
+1,7
@@
#!/usr/bin/env python
#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
from __future__ import absolute_import, print_function
from __future__ import absolute_import, print_function
@@
-30,20
+33,16
@@
class RemotePollLoopActor(actor_class):
response to subscribers. It takes care of error handling, and retrying
requests with exponential backoff.
response to subscribers. It takes care of error handling, and retrying
requests with exponential backoff.
- To use this actor, define
CLIENT_ERRORS and the _send_request method.
- If you also define an _item_key method, this class will support
-
subscribing to
a specific item by key in responses.
+ To use this actor, define
the _send_request method. If you also
+ define an _item_key method, this class will support subscribing to
+ a specific item by key in responses.
"""
"""
- CLIENT_ERRORS = ()
-
def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._logger = logging.getLogger(self.LOGGER_NAME)
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._polling_started = False
self._polling_started = False
- self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
@@
-52,6
+51,9
@@
class RemotePollLoopActor(actor_class):
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
def _start_polling(self):
if not self._polling_started:
self._polling_started = True
def _start_polling(self):
if not self._polling_started:
self._polling_started = True
@@
-59,22
+61,20
@@
class RemotePollLoopActor(actor_class):
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
- self._logger.debug("%
r subscribed to all events", subscriber
)
+ self._logger.debug("%
s subscribed to all events", subscriber.actor_ref.actor_urn
)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
- self._logger.debug("%
r subscribed to events for '%s'", subscriber
, key)
+ self._logger.debug("%
s subscribed to events for '%s'", subscriber.actor_ref.actor_urn
, key)
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
- self._logger.debug("%s got response with %d items",
- self.log_prefix, len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
@@
-84,11
+84,14
@@
class RemotePollLoopActor(actor_class):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- return "{} got error: {} - waiting {} seconds".format(
- self.log_prefix, error, self.poll_wait)
+ return "got error: {} - will try again in {} seconds".format(
+ error, self.poll_wait)
+
+ def is_common_error(self, exception):
+ return False
def poll(self, scheduled_start=None):
def poll(self, scheduled_start=None):
- self._logger.debug("
%s sending poll", self.log_prefix
)
+ self._logger.debug("
sending request"
)
start_time = time.time()
if scheduled_start is None:
scheduled_start = start_time
start_time = time.time()
if scheduled_start is None:
scheduled_start = start_time
@@
-96,7
+99,7
@@
class RemotePollLoopActor(actor_class):
response = self._send_request()
except Exception as error:
errmsg = self._got_error(error)
response = self._send_request()
except Exception as error:
errmsg = self._got_error(error)
- if
isinstance(error, self.CLIENT_ERRORS
):
+ if
self.is_common_error(error
):
self._logger.warning(errmsg)
else:
self._logger.exception(errmsg)
self._logger.warning(errmsg)
else:
self._logger.exception(errmsg)
@@
-104,6
+107,9
@@
class RemotePollLoopActor(actor_class):
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
+ self._logger.info("got response with %d items in %s seconds, next poll at %s",
+ len(response), (time.time() - scheduled_start),
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait