Merge branch '8497-datamanager-batchsize-1000' of https://github.com/wtsi-hgi/arvados...
[arvados.git] / services / nodemanager / tests / test_clientactor.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import unittest
6
7 import mock
8 import pykka
9
10 import arvnodeman.clientactor as clientactor
11 from . import testutil
12
13 class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
14                                   unittest.TestCase):
15     class MockClientError(Exception):
16         pass
17
18     class TestActor(clientactor.RemotePollLoopActor):
19         LOGGER_NAME = 'arvnodeman.testpoll'
20
21         def _send_request(self):
22             return self._client()
23     TestActor.CLIENT_ERRORS = (MockClientError,)
24     TEST_CLASS = TestActor
25
26
27     def build_monitor(self, side_effect, *args, **kwargs):
28         super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
29         self.client.side_effect = side_effect
30
31     def test_poll_loop_starts_after_subscription(self):
32         self.build_monitor(['test1'])
33         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
34         self.stop_proxy(self.monitor)
35         self.subscriber.assert_called_with('test1')
36         self.assertTrue(self.timer.schedule.called)
37
38     def test_poll_loop_continues_after_failure(self):
39         self.build_monitor(self.MockClientError)
40         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
41         self.assertTrue(self.stop_proxy(self.monitor),
42                         "poll loop died after error")
43         self.assertTrue(self.timer.schedule.called,
44                         "poll loop did not reschedule after error")
45         self.assertFalse(self.subscriber.called,
46                          "poll loop notified subscribers after error")
47
48     def test_late_subscribers_get_responses(self):
49         self.build_monitor(['pre_late_test', 'late_test'])
50         mock_subscriber = mock.Mock(name='mock_subscriber')
51         self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
52         self.monitor.subscribe(self.subscriber)
53         self.monitor.poll().get(self.TIMEOUT)
54         self.stop_proxy(self.monitor)
55         self.subscriber.assert_called_with('late_test')
56
57     def test_survive_dead_subscriptions(self):
58         self.build_monitor(['survive1', 'survive2'])
59         dead_subscriber = mock.Mock(name='dead_subscriber')
60         dead_subscriber.side_effect = pykka.ActorDeadError
61         self.monitor.subscribe(dead_subscriber)
62         self.monitor.subscribe(self.subscriber)
63         self.monitor.poll().get(self.TIMEOUT)
64         self.assertTrue(self.stop_proxy(self.monitor),
65                         "poll loop died from dead subscriber")
66         self.subscriber.assert_called_with('survive2')
67
68     def check_poll_timers(self, *test_times):
69         schedule_mock = self.timer.schedule
70         last_expect = None
71         with mock.patch('time.time') as time_mock:
72             for fake_time, expect_next in test_times:
73                 time_mock.return_value = fake_time
74                 self.monitor.poll(last_expect).get(self.TIMEOUT)
75                 self.assertTrue(schedule_mock.called)
76                 self.assertEqual(expect_next, schedule_mock.call_args[0][0])
77                 schedule_mock.reset_mock()
78                 last_expect = expect_next
79
80     def test_poll_timing_on_consecutive_successes_with_drift(self):
81         self.build_monitor(['1', '2'], poll_wait=3, max_poll_wait=14)
82         self.check_poll_timers((0, 3), (4, 6))
83
84     def test_poll_backoff_on_failures(self):
85         self.build_monitor(self.MockClientError, poll_wait=3, max_poll_wait=14)
86         self.check_poll_timers((0, 6), (6, 18), (18, 32))
87
88     def test_poll_timing_after_error_recovery(self):
89         self.build_monitor(['a', self.MockClientError(), 'b'],
90                            poll_wait=3, max_poll_wait=14)
91         self.check_poll_timers((0, 3), (4, 10), (10, 13))
92
93     def test_no_subscriptions_by_key_without_support(self):
94         self.build_monitor([])
95         with self.assertRaises(AttributeError):
96             self.monitor.subscribe_to('key')
97
98
99 class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
100                                           unittest.TestCase):
101     class TestActor(RemotePollLoopActorTestCase.TestActor):
102         def _item_key(self, item):
103             return item['key']
104     TEST_CLASS = TestActor
105
106
107     def build_monitor(self, side_effect, *args, **kwargs):
108         super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
109             *args, **kwargs)
110         self.client.side_effect = side_effect
111
112     def test_key_subscription(self):
113         self.build_monitor([[{'key': 1}, {'key': 2}]])
114         self.monitor.subscribe_to(2, self.subscriber).get(self.TIMEOUT)
115         self.stop_proxy(self.monitor)
116         self.subscriber.assert_called_with({'key': 2})
117
118     def test_survive_dead_key_subscriptions(self):
119         item = {'key': 3}
120         self.build_monitor([[item], [item]])
121         dead_subscriber = mock.Mock(name='dead_subscriber')
122         dead_subscriber.side_effect = pykka.ActorDeadError
123         self.monitor.subscribe_to(3, dead_subscriber)
124         self.monitor.subscribe_to(3, self.subscriber)
125         self.monitor.poll().get(self.TIMEOUT)
126         self.assertTrue(self.stop_proxy(self.monitor),
127                         "poll loop died from dead key subscriber")
128         self.subscriber.assert_called_with(item)
129
130     def test_mixed_subscriptions(self):
131         item = {'key': 4}
132         self.build_monitor([[item], [item]])
133         key_subscriber = mock.Mock(name='key_subscriber')
134         self.monitor.subscribe(self.subscriber)
135         self.monitor.subscribe_to(4, key_subscriber)
136         self.monitor.poll().get(self.TIMEOUT)
137         self.stop_proxy(self.monitor)
138         self.subscriber.assert_called_with([item])
139         key_subscriber.assert_called_with(item)
140
141     def test_subscription_to_missing_key(self):
142         self.build_monitor([[]])
143         self.monitor.subscribe_to('nonesuch', self.subscriber).get(self.TIMEOUT)
144         self.stop_proxy(self.monitor)
145         self.subscriber.assert_called_with(None)
146
147
148 if __name__ == '__main__':
149     unittest.main()