3410: Merge branch 'master' into 3410-replication-attrs
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
14
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
16                                      unittest.TestCase):
17     def new_setup_proxy(self):
18         # Make sure that every time the daemon starts a setup actor,
19         # it gets a new mock object back.
20         self.last_setup = mock.MagicMock(name='setup_proxy_mock')
21         return self.last_setup
22
23     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
24                     min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
25         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
26             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
27         self.arv_factory = mock.MagicMock(name='arvados_mock')
28         self.cloud_factory = mock.MagicMock(name='cloud_mock')
29         self.cloud_factory().node_start_time.return_value = time.time()
30         self.cloud_updates = mock.MagicMock(name='updates_mock')
31         self.timer = testutil.MockTimer(deliver_immediately=False)
32         self.node_setup = mock.MagicMock(name='setup_mock')
33         self.node_setup.start().proxy.side_effect = self.new_setup_proxy
34         self.node_setup.reset_mock()
35         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
36         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
37             self.server_wishlist_poller, self.arvados_nodes_poller,
38             self.cloud_nodes_poller, self.cloud_updates, self.timer,
39             self.arv_factory, self.cloud_factory,
40             [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
41             self.node_setup, self.node_shutdown).proxy()
42         if cloud_nodes is not None:
43             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
44         if arvados_nodes is not None:
45             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
46         if want_sizes is not None:
47             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
48
49     def monitor_list(self):
50         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
51
52     def monitored_arvados_nodes(self):
53         pairings = []
54         for future in [actor.proxy().arvados_node
55                        for actor in self.monitor_list()]:
56             try:
57                 pairings.append(future.get(self.TIMEOUT))
58             except pykka.ActorDeadError:
59                 pass
60         return pairings
61
62     def alive_monitor_count(self):
63         return len(self.monitored_arvados_nodes())
64
65     def assertShutdownCancellable(self, expected=True):
66         self.assertTrue(self.node_shutdown.start.called)
67         self.assertIs(expected,
68                       self.node_shutdown.start.call_args[1]['cancellable'],
69                       "ComputeNodeShutdownActor incorrectly cancellable")
70
71     def test_easy_node_creation(self):
72         size = testutil.MockSize(1)
73         self.make_daemon(want_sizes=[size])
74         self.stop_proxy(self.daemon)
75         self.assertTrue(self.node_setup.start.called)
76
77     def check_monitors_arvados_nodes(self, *arv_nodes):
78         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
79
80     def test_node_pairing(self):
81         cloud_node = testutil.cloud_node_mock(1)
82         arv_node = testutil.arvados_node_mock(1)
83         self.make_daemon([cloud_node], [arv_node])
84         self.stop_proxy(self.daemon)
85         self.check_monitors_arvados_nodes(arv_node)
86
87     def test_node_pairing_after_arvados_update(self):
88         cloud_node = testutil.cloud_node_mock(2)
89         self.make_daemon([cloud_node],
90                          [testutil.arvados_node_mock(2, ip_address=None)])
91         arv_node = testutil.arvados_node_mock(2)
92         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
93         self.stop_proxy(self.daemon)
94         self.check_monitors_arvados_nodes(arv_node)
95
96     def test_arvados_node_un_and_re_paired(self):
97         arv_node = testutil.arvados_node_mock(3)
98         self.make_daemon([testutil.cloud_node_mock(3)], [arv_node])
99         self.check_monitors_arvados_nodes(arv_node)
100         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
101         self.assertEqual(0, self.alive_monitor_count())
102         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
103         self.stop_proxy(self.daemon)
104         self.check_monitors_arvados_nodes(arv_node)
105
106     def test_old_arvados_node_not_double_assigned(self):
107         arv_node = testutil.arvados_node_mock(3, age=9000)
108         size = testutil.MockSize(3)
109         self.make_daemon(arvados_nodes=[arv_node])
110         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
111         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
112         self.stop_proxy(self.daemon)
113         used_nodes = [call[1].get('arvados_node')
114                       for call in self.node_setup.start.call_args_list]
115         self.assertEqual(2, len(used_nodes))
116         self.assertIn(arv_node, used_nodes)
117         self.assertIn(None, used_nodes)
118
119     def test_node_count_satisfied(self):
120         self.make_daemon([testutil.cloud_node_mock()],
121                          want_sizes=[testutil.MockSize(1)])
122         self.stop_proxy(self.daemon)
123         self.assertFalse(self.node_setup.called)
124
125     def test_booting_nodes_counted(self):
126         cloud_node = testutil.cloud_node_mock(1)
127         arv_node = testutil.arvados_node_mock(1)
128         server_wishlist = [testutil.MockSize(1)] * 2
129         self.make_daemon([cloud_node], [arv_node], server_wishlist)
130         self.daemon.max_nodes.get(self.TIMEOUT)
131         self.assertTrue(self.node_setup.start.called)
132         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
133         self.stop_proxy(self.daemon)
134         self.assertEqual(1, self.node_setup.start.call_count)
135
136     def test_boot_new_node_when_all_nodes_busy(self):
137         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
138         self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
139                          [testutil.MockSize(2)])
140         self.stop_proxy(self.daemon)
141         self.assertTrue(self.node_setup.start.called)
142
143     def test_boot_new_node_below_min_nodes(self):
144         min_size = testutil.MockSize(1)
145         wish_size = testutil.MockSize(3)
146         self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
147         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
148         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
149         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
150         self.stop_proxy(self.daemon)
151         self.assertEqual([wish_size, min_size],
152                          [call[1].get('cloud_size')
153                           for call in self.node_setup.start.call_args_list])
154
155     def test_no_new_node_when_ge_min_nodes_busy(self):
156         cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
157         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
158                      for n in range(1, 4)]
159         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
160         self.stop_proxy(self.daemon)
161         self.assertEqual(0, self.node_setup.start.call_count)
162
163     def test_no_new_node_when_max_nodes_busy(self):
164         self.make_daemon([testutil.cloud_node_mock(3)],
165                          [testutil.arvados_node_mock(3, job_uuid=True)],
166                          [testutil.MockSize(3)],
167                          max_nodes=1)
168         self.stop_proxy(self.daemon)
169         self.assertFalse(self.node_setup.start.called)
170
171     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
172         if cloud_node is None:
173             cloud_node = testutil.cloud_node_mock(id_num)
174         if arv_node is None:
175             arv_node = testutil.arvados_node_mock(id_num)
176         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
177         self.daemon.max_nodes.get(self.TIMEOUT)
178         self.assertEqual(1, self.node_setup.start.call_count)
179         self.last_setup.cloud_node.get.return_value = cloud_node
180         self.last_setup.arvados_node.get.return_value = arv_node
181         return self.last_setup
182
183     def test_no_duplication_when_booting_node_listed_fast(self):
184         # Test that we don't start two ComputeNodeMonitorActors when
185         # we learn about a booting node through a listing before we
186         # get the "node up" message from CloudNodeSetupActor.
187         cloud_node = testutil.cloud_node_mock(1)
188         setup = self.start_node_boot(cloud_node)
189         self.daemon.update_cloud_nodes([cloud_node])
190         self.daemon.node_up(setup).get(self.TIMEOUT)
191         self.assertEqual(1, self.alive_monitor_count())
192
193     def test_no_duplication_when_booted_node_listed(self):
194         cloud_node = testutil.cloud_node_mock(2)
195         setup = self.start_node_boot(cloud_node, id_num=2)
196         self.daemon.node_up(setup)
197         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
198         self.assertEqual(1, self.alive_monitor_count())
199
200     def test_node_counted_after_boot_with_slow_listing(self):
201         # Test that, after we boot a compute node, we assume it exists
202         # even it doesn't appear in the listing (e.g., because of delays
203         # propagating tags).
204         setup = self.start_node_boot()
205         self.daemon.node_up(setup).get(self.TIMEOUT)
206         self.assertEqual(1, self.alive_monitor_count())
207         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
208         self.assertEqual(1, self.alive_monitor_count())
209
210     def test_booted_unlisted_node_counted(self):
211         setup = self.start_node_boot(id_num=1)
212         self.daemon.node_up(setup)
213         self.daemon.update_server_wishlist(
214             [testutil.MockSize(1)]).get(self.TIMEOUT)
215         self.stop_proxy(self.daemon)
216         self.assertEqual(1, self.node_setup.start.call_count)
217
218     def test_booted_node_can_shutdown(self):
219         setup = self.start_node_boot()
220         self.daemon.node_up(setup).get(self.TIMEOUT)
221         self.assertEqual(1, self.alive_monitor_count())
222         monitor = self.monitor_list()[0].proxy()
223         self.daemon.update_server_wishlist([])
224         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
225         self.stop_proxy(self.daemon)
226         self.assertTrue(self.node_shutdown.start.called,
227                         "daemon did not shut down booted node on offer")
228
229     def test_booted_node_lifecycle(self):
230         cloud_node = testutil.cloud_node_mock(6)
231         setup = self.start_node_boot(cloud_node, id_num=6)
232         self.daemon.node_up(setup).get(self.TIMEOUT)
233         self.assertEqual(1, self.alive_monitor_count())
234         monitor = self.monitor_list()[0].proxy()
235         self.daemon.update_server_wishlist([])
236         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
237         self.assertShutdownCancellable(True)
238         shutdown = self.node_shutdown.start().proxy()
239         shutdown.cloud_node.get.return_value = cloud_node
240         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
241         self.assertTrue(shutdown.stop.called,
242                         "shutdown actor not stopped after finishing")
243         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
244                         "monitor for booted node not stopped after shutdown")
245         self.daemon.update_server_wishlist(
246             [testutil.MockSize(2)]).get(self.TIMEOUT)
247         self.stop_proxy(self.daemon)
248         self.assertTrue(self.node_setup.start.called,
249                         "second node not started after booted node stopped")
250
251     def test_booted_node_shut_down_when_never_listed(self):
252         setup = self.start_node_boot()
253         self.daemon.node_up(setup).get(self.TIMEOUT)
254         self.assertEqual(1, self.alive_monitor_count())
255         self.assertFalse(self.node_shutdown.start.called)
256         self.timer.deliver()
257         self.stop_proxy(self.daemon)
258         self.assertShutdownCancellable(False)
259
260     def test_booted_node_shut_down_when_never_paired(self):
261         cloud_node = testutil.cloud_node_mock(2)
262         setup = self.start_node_boot(cloud_node)
263         self.daemon.node_up(setup).get(self.TIMEOUT)
264         self.assertEqual(1, self.alive_monitor_count())
265         self.daemon.update_cloud_nodes([cloud_node])
266         self.timer.deliver()
267         self.stop_proxy(self.daemon)
268         self.assertShutdownCancellable(False)
269
270     def test_node_that_pairs_not_considered_failed_boot(self):
271         cloud_node = testutil.cloud_node_mock(3)
272         arv_node = testutil.arvados_node_mock(3)
273         setup = self.start_node_boot(cloud_node, arv_node)
274         self.daemon.node_up(setup).get(self.TIMEOUT)
275         self.assertEqual(1, self.alive_monitor_count())
276         self.daemon.update_cloud_nodes([cloud_node])
277         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
278         self.timer.deliver()
279         self.stop_proxy(self.daemon)
280         self.assertFalse(self.node_shutdown.start.called)
281
282     def test_booting_nodes_shut_down(self):
283         self.make_daemon(want_sizes=[testutil.MockSize(1)])
284         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
285         self.stop_proxy(self.daemon)
286         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
287
288     def test_shutdown_declined_at_wishlist_capacity(self):
289         cloud_node = testutil.cloud_node_mock(1)
290         size = testutil.MockSize(1)
291         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
292         self.assertEqual(1, self.alive_monitor_count())
293         monitor = self.monitor_list()[0].proxy()
294         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
295         self.stop_proxy(self.daemon)
296         self.assertFalse(self.node_shutdown.start.called)
297
298     def test_shutdown_declined_below_min_nodes(self):
299         cloud_node = testutil.cloud_node_mock(1)
300         self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
301         self.assertEqual(1, self.alive_monitor_count())
302         monitor = self.monitor_list()[0].proxy()
303         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
304         self.stop_proxy(self.daemon)
305         self.assertFalse(self.node_shutdown.start.called)
306
307     def test_shutdown_accepted_below_capacity(self):
308         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
309         self.assertEqual(1, self.alive_monitor_count())
310         monitor = self.monitor_list()[0].proxy()
311         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
312         self.stop_proxy(self.daemon)
313         self.assertTrue(self.node_shutdown.start.called)
314
315     def test_shutdown_declined_when_idle_and_job_queued(self):
316         cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
317         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
318                      testutil.arvados_node_mock(4, job_uuid=None)]
319         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
320         self.assertEqual(2, self.alive_monitor_count())
321         for mon_ref in self.monitor_list():
322             monitor = mon_ref.proxy()
323             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
324                 break
325         else:
326             self.fail("monitor for idle node not found")
327         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
328         self.stop_proxy(self.daemon)
329         self.assertFalse(self.node_shutdown.start.called)
330
331     def test_node_shutdown_after_cancelled_shutdown(self):
332         cloud_node = testutil.cloud_node_mock(5)
333         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
334         self.assertEqual(1, self.alive_monitor_count())
335         monitor = self.monitor_list()[0].proxy()
336         shutdown_proxy = self.node_shutdown.start().proxy
337         shutdown_proxy().cloud_node.get.return_value = cloud_node
338         shutdown_proxy().success.get.return_value = False
339         shutdown_proxy.reset_mock()
340         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
341         self.assertTrue(shutdown_proxy.called)
342         self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
343         shutdown_proxy().success.get.return_value = True
344         shutdown_proxy.reset_mock()
345         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
346         self.assertTrue(shutdown_proxy.called)
347
348     def test_nodes_shutting_down_replaced_below_max_nodes(self):
349         cloud_node = testutil.cloud_node_mock(6)
350         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
351         self.assertEqual(1, self.alive_monitor_count())
352         monitor = self.monitor_list()[0].proxy()
353         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
354         self.assertTrue(self.node_shutdown.start.called)
355         self.daemon.update_server_wishlist(
356             [testutil.MockSize(6)]).get(self.TIMEOUT)
357         self.stop_proxy(self.daemon)
358         self.assertTrue(self.node_setup.start.called)
359
360     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
361         cloud_node = testutil.cloud_node_mock(7)
362         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
363                          max_nodes=1)
364         self.assertEqual(1, self.alive_monitor_count())
365         monitor = self.monitor_list()[0].proxy()
366         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
367         self.assertTrue(self.node_shutdown.start.called)
368         self.daemon.update_server_wishlist(
369             [testutil.MockSize(7)]).get(self.TIMEOUT)
370         self.stop_proxy(self.daemon)
371         self.assertFalse(self.node_setup.start.called)
372
373     def test_nodes_shutting_down_count_against_excess(self):
374         cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
375         arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
376         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
377         self.assertEqual(2, self.alive_monitor_count())
378         for mon_ref in self.monitor_list():
379             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
380         self.assertEqual(1, self.node_shutdown.start.call_count)
381
382     def test_clean_shutdown_waits_for_node_setup_finish(self):
383         new_node = self.start_node_boot()
384         self.daemon.shutdown().get(self.TIMEOUT)
385         self.assertTrue(new_node.stop_if_no_cloud_node.called)
386         self.daemon.node_up(new_node).get(self.TIMEOUT)
387         self.assertTrue(new_node.stop.called)
388         self.timer.deliver()
389         self.assertTrue(
390             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
391
392     def test_wishlist_ignored_after_shutdown(self):
393         size = testutil.MockSize(2)
394         self.make_daemon(want_sizes=[size])
395         self.daemon.shutdown().get(self.TIMEOUT)
396         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
397         self.timer.deliver()
398         self.stop_proxy(self.daemon)
399         self.assertEqual(1, self.node_setup.start.call_count)