16212: Merge branch 'master'
[arvados.git] / services / nodemanager / tests / test_clientactor.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import unittest
9
10 import mock
11 import pykka
12
13 import arvnodeman.clientactor as clientactor
14 from . import testutil
15
16 class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
17                                   unittest.TestCase):
18     class MockClientError(Exception):
19         pass
20
21     class TestActor(clientactor.RemotePollLoopActor):
22         LOGGER_NAME = 'arvnodeman.testpoll'
23
24         def _send_request(self):
25             return self._client()
26     TestActor.CLIENT_ERRORS = (MockClientError,)
27     TEST_CLASS = TestActor
28
29
30     def build_monitor(self, side_effect, *args, **kwargs):
31         super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
32         self.client.side_effect = side_effect
33
34     def test_poll_loop_starts_after_subscription(self):
35         self.build_monitor(['test1'])
36         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
37         self.stop_proxy(self.monitor)
38         self.subscriber.assert_called_with('test1')
39         self.assertTrue(self.timer.schedule.called)
40
41     def test_poll_loop_continues_after_failure(self):
42         self.build_monitor(self.MockClientError)
43         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
44         self.assertTrue(self.stop_proxy(self.monitor),
45                         "poll loop died after error")
46         self.assertTrue(self.timer.schedule.called,
47                         "poll loop did not reschedule after error")
48         self.assertFalse(self.subscriber.called,
49                          "poll loop notified subscribers after error")
50
51     def test_late_subscribers_get_responses(self):
52         self.build_monitor(['pre_late_test', 'late_test'])
53         mock_subscriber = mock.Mock(name='mock_subscriber')
54         self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
55         self.monitor.subscribe(self.subscriber)
56         self.monitor.poll().get(self.TIMEOUT)
57         self.stop_proxy(self.monitor)
58         self.subscriber.assert_called_with('late_test')
59
60     def test_survive_dead_subscriptions(self):
61         self.build_monitor(['survive1', 'survive2'])
62         dead_subscriber = mock.Mock(name='dead_subscriber')
63         dead_subscriber.side_effect = pykka.ActorDeadError
64         self.monitor.subscribe(dead_subscriber)
65         self.monitor.subscribe(self.subscriber)
66         self.monitor.poll().get(self.TIMEOUT)
67         self.assertTrue(self.stop_proxy(self.monitor),
68                         "poll loop died from dead subscriber")
69         self.subscriber.assert_called_with('survive2')
70
71     def check_poll_timers(self, *test_times):
72         schedule_mock = self.timer.schedule
73         last_expect = None
74         with mock.patch('time.time') as time_mock:
75             for fake_time, expect_next in test_times:
76                 time_mock.return_value = fake_time
77                 self.monitor.poll(last_expect).get(self.TIMEOUT)
78                 self.assertTrue(schedule_mock.called)
79                 self.assertEqual(expect_next, schedule_mock.call_args[0][0])
80                 schedule_mock.reset_mock()
81                 last_expect = expect_next
82
83     def test_poll_timing_on_consecutive_successes_with_drift(self):
84         self.build_monitor(['1', '2'], poll_wait=3, max_poll_wait=14)
85         self.check_poll_timers((0, 3), (4, 6))
86
87     def test_poll_backoff_on_failures(self):
88         self.build_monitor(self.MockClientError, poll_wait=3, max_poll_wait=14)
89         self.check_poll_timers((0, 6), (6, 18), (18, 32))
90
91     def test_poll_timing_after_error_recovery(self):
92         self.build_monitor(['a', self.MockClientError(), 'b'],
93                            poll_wait=3, max_poll_wait=14)
94         self.check_poll_timers((0, 3), (4, 10), (10, 13))
95
96     def test_no_subscriptions_by_key_without_support(self):
97         self.build_monitor([])
98         with self.assertRaises(AttributeError):
99             self.monitor.subscribe_to('key')
100
101
102 class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
103                                           unittest.TestCase):
104     class TestActor(RemotePollLoopActorTestCase.TestActor):
105         def _item_key(self, item):
106             return item['key']
107     TEST_CLASS = TestActor
108
109
110     def build_monitor(self, side_effect, *args, **kwargs):
111         super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
112             *args, **kwargs)
113         self.client.side_effect = side_effect
114
115     def test_key_subscription(self):
116         self.build_monitor([[{'key': 1}, {'key': 2}]])
117         self.monitor.subscribe_to(2, self.subscriber).get(self.TIMEOUT)
118         self.stop_proxy(self.monitor)
119         self.subscriber.assert_called_with({'key': 2})
120
121     def test_survive_dead_key_subscriptions(self):
122         item = {'key': 3}
123         self.build_monitor([[item], [item]])
124         dead_subscriber = mock.Mock(name='dead_subscriber')
125         dead_subscriber.side_effect = pykka.ActorDeadError
126         self.monitor.subscribe_to(3, dead_subscriber)
127         self.monitor.subscribe_to(3, self.subscriber)
128         self.monitor.poll().get(self.TIMEOUT)
129         self.assertTrue(self.stop_proxy(self.monitor),
130                         "poll loop died from dead key subscriber")
131         self.subscriber.assert_called_with(item)
132
133     def test_mixed_subscriptions(self):
134         item = {'key': 4}
135         self.build_monitor([[item], [item]])
136         key_subscriber = mock.Mock(name='key_subscriber')
137         self.monitor.subscribe(self.subscriber)
138         self.monitor.subscribe_to(4, key_subscriber)
139         self.monitor.poll().get(self.TIMEOUT)
140         self.stop_proxy(self.monitor)
141         self.subscriber.assert_called_with([item])
142         key_subscriber.assert_called_with(item)
143
144     def test_subscription_to_missing_key(self):
145         self.build_monitor([[]])
146         self.monitor.subscribe_to('nonesuch', self.subscriber).get(self.TIMEOUT)
147         self.stop_proxy(self.monitor)
148         self.subscriber.assert_called_with(None)
149
150
151 if __name__ == '__main__':
152     unittest.main()