From 9362030c75c09e363f95dcf742f79570107928d1 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Tue, 21 Oct 2014 14:31:44 -0400 Subject: [PATCH] 4139: Improve scheduling of Node Manager polls. * Catch all exceptions, so that we keep polling no matter what happens. Use CLIENT_ERRORS as a hint about how much logging we need. * Make the next poll time calculation a little less stateful and easier to follow. * Add tests for poll scheduling. --- .../nodemanager/arvnodeman/clientactor.py | 34 ++++++++++++------- .../nodemanager/tests/test_clientactor.py | 25 ++++++++++++++ 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py index d6b220f791..46a103eb02 100644 --- a/services/nodemanager/arvnodeman/clientactor.py +++ b/services/nodemanager/arvnodeman/clientactor.py @@ -34,25 +34,27 @@ class RemotePollLoopActor(actor_class): If you also define an _item_key method, this class will support subscribing to a specific item by key in responses. """ + CLIENT_ERRORS = () + def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180): super(RemotePollLoopActor, self).__init__() self._client = client self._timer = timer_actor self._logger = logging.getLogger(self.LOGGER_NAME) self._later = self.actor_ref.proxy() + self._polling_started = False self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self)) self.min_poll_wait = poll_wait self.max_poll_wait = max_poll_wait self.poll_wait = self.min_poll_wait - self.last_poll_time = None self.all_subscribers = set() self.key_subscribers = {} if hasattr(self, '_item_key'): self.subscribe_to = self._subscribe_to def _start_polling(self): - if self.last_poll_time is None: - self.last_poll_time = time.time() + if not self._polling_started: + self._polling_started = True self._later.poll() def subscribe(self, subscriber): @@ -82,19 +84,27 @@ class RemotePollLoopActor(actor_class): def _got_error(self, error): self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait) - self._logger.warning("%s got error: %s - waiting %s seconds", - self.log_prefix, error, self.poll_wait) + return "{} got error: {} - waiting {} seconds".format( + self.log_prefix, error, self.poll_wait) - def poll(self): + def poll(self, scheduled_start=None): self._logger.debug("%s sending poll", self.log_prefix) start_time = time.time() + if scheduled_start is None: + scheduled_start = start_time try: response = self._send_request() - except self.CLIENT_ERRORS as error: - self.last_poll_time = start_time - self._got_error(error) + except Exception as error: + errmsg = self._got_error(error) + if isinstance(error, self.CLIENT_ERRORS): + self._logger.warning(errmsg) + else: + self._logger.exception(errmsg) + next_poll = start_time + self.poll_wait else: - self.last_poll_time += self.poll_wait self._got_response(response) - self._timer.schedule(self.last_poll_time + self.poll_wait, - self._later.poll) + next_poll = scheduled_start + self.poll_wait + end_time = time.time() + if next_poll < end_time: # We've drifted too much; start fresh. + next_poll = end_time + self.poll_wait + self._timer.schedule(next_poll, self._later.poll, next_poll) diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py index 1e4c40ec6b..57a0d32d06 100644 --- a/services/nodemanager/tests/test_clientactor.py +++ b/services/nodemanager/tests/test_clientactor.py @@ -64,6 +64,31 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin, "poll loop died from dead subscriber") self.subscriber.assert_called_with('survive2') + def check_poll_timers(self, *test_times): + schedule_mock = self.timer.schedule + last_expect = None + with mock.patch('time.time') as time_mock: + for fake_time, expect_next in test_times: + time_mock.return_value = fake_time + self.monitor.poll(last_expect).get(self.TIMEOUT) + self.assertTrue(schedule_mock.called) + self.assertEqual(expect_next, schedule_mock.call_args[0][0]) + schedule_mock.reset_mock() + last_expect = expect_next + + def test_poll_timing_on_consecutive_successes_with_drift(self): + self.build_monitor(['1', '2'], poll_wait=3, max_poll_wait=14) + self.check_poll_timers((0, 3), (4, 6)) + + def test_poll_backoff_on_failures(self): + self.build_monitor(self.MockClientError, poll_wait=3, max_poll_wait=14) + self.check_poll_timers((0, 6), (6, 18), (18, 32)) + + def test_poll_timing_after_error_recovery(self): + self.build_monitor(['a', self.MockClientError(), 'b'], + poll_wait=3, max_poll_wait=14) + self.check_poll_timers((0, 3), (4, 10), (10, 13)) + def test_no_subscriptions_by_key_without_support(self): self.build_monitor([]) with self.assertRaises(AttributeError): -- 2.30.2