Merge branch '4499-one-task-per-input-file-normalize'
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
14
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
16                                      unittest.TestCase):
17     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
18                     min_nodes=0, max_nodes=8):
19         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
20             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
21         self.arv_factory = mock.MagicMock(name='arvados_mock')
22         self.cloud_factory = mock.MagicMock(name='cloud_mock')
23         self.cloud_factory().node_start_time.return_value = time.time()
24         self.cloud_updates = mock.MagicMock(name='updates_mock')
25         self.timer = testutil.MockTimer(deliver_immediately=False)
26         self.node_setup = mock.MagicMock(name='setup_mock')
27         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
28         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
29             self.server_wishlist_poller, self.arvados_nodes_poller,
30             self.cloud_nodes_poller, self.cloud_updates, self.timer,
31             self.arv_factory, self.cloud_factory,
32             [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
33             self.node_setup, self.node_shutdown).proxy()
34         if cloud_nodes is not None:
35             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
36         if arvados_nodes is not None:
37             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
38         if want_sizes is not None:
39             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
40
41     def monitor_list(self):
42         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
43
44     def alive_monitor_count(self):
45         return sum(1 for actor in self.monitor_list() if actor.is_alive())
46
47     def assertShutdownCancellable(self, expected=True):
48         self.assertTrue(self.node_shutdown.start.called)
49         self.assertIs(expected,
50                       self.node_shutdown.start.call_args[1]['cancellable'],
51                       "ComputeNodeShutdownActor incorrectly cancellable")
52
53     def test_easy_node_creation(self):
54         size = testutil.MockSize(1)
55         self.make_daemon(want_sizes=[size])
56         self.stop_proxy(self.daemon)
57         self.assertTrue(self.node_setup.start.called)
58
59     def test_node_pairing(self):
60         cloud_node = testutil.cloud_node_mock(1)
61         arv_node = testutil.arvados_node_mock(1)
62         self.make_daemon([cloud_node], [arv_node])
63         self.stop_proxy(self.daemon)
64         self.assertEqual(1, self.alive_monitor_count())
65         self.assertIs(
66             self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
67             arv_node)
68
69     def test_node_pairing_after_arvados_update(self):
70         cloud_node = testutil.cloud_node_mock(2)
71         self.make_daemon([cloud_node],
72                          [testutil.arvados_node_mock(2, ip_address=None)])
73         arv_node = testutil.arvados_node_mock(2)
74         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
75         self.stop_proxy(self.daemon)
76         self.assertEqual(1, self.alive_monitor_count())
77         self.assertIs(
78             self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
79             arv_node)
80
81     def test_old_arvados_node_not_double_assigned(self):
82         arv_node = testutil.arvados_node_mock(3, age=9000)
83         size = testutil.MockSize(3)
84         self.make_daemon(arvados_nodes=[arv_node])
85         setup_ref = self.node_setup.start().proxy().actor_ref
86         setup_ref.actor_urn = 0
87         self.node_setup.start.reset_mock()
88         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
89         self.daemon.max_nodes.get(self.TIMEOUT)
90         setup_ref.actor_urn += 1
91         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
92         self.stop_proxy(self.daemon)
93         used_nodes = [call[1].get('arvados_node')
94                       for call in self.node_setup.start.call_args_list]
95         self.assertEqual(2, len(used_nodes))
96         self.assertIn(arv_node, used_nodes)
97         self.assertIn(None, used_nodes)
98
99     def test_node_count_satisfied(self):
100         self.make_daemon([testutil.cloud_node_mock()],
101                          want_sizes=[testutil.MockSize(1)])
102         self.stop_proxy(self.daemon)
103         self.assertFalse(self.node_setup.called)
104
105     def test_booting_nodes_counted(self):
106         cloud_node = testutil.cloud_node_mock(1)
107         arv_node = testutil.arvados_node_mock(1)
108         server_wishlist = [testutil.MockSize(1)] * 2
109         self.make_daemon([cloud_node], [arv_node], server_wishlist)
110         self.daemon.max_nodes.get(self.TIMEOUT)
111         self.assertTrue(self.node_setup.start.called)
112         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
113         self.stop_proxy(self.daemon)
114         self.assertEqual(1, self.node_setup.start.call_count)
115
116     def test_boot_new_node_when_all_nodes_busy(self):
117         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
118         self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
119                          [testutil.MockSize(2)])
120         self.stop_proxy(self.daemon)
121         self.assertTrue(self.node_setup.start.called)
122
123     def test_no_new_node_when_max_nodes_busy(self):
124         self.make_daemon([testutil.cloud_node_mock(3)],
125                          [testutil.arvados_node_mock(3, job_uuid=True)],
126                          [testutil.MockSize(3)],
127                          max_nodes=1)
128         self.stop_proxy(self.daemon)
129         self.assertFalse(self.node_setup.start.called)
130
131     def mock_setup_actor(self, cloud_node, arv_node):
132         setup = self.node_setup.start().proxy()
133         self.node_setup.reset_mock()
134         setup.actor_urn = cloud_node.id
135         setup.cloud_node.get.return_value = cloud_node
136         setup.arvados_node.get.return_value = arv_node
137         return setup
138
139     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
140         if cloud_node is None:
141             cloud_node = testutil.cloud_node_mock(id_num)
142         if arv_node is None:
143             arv_node = testutil.arvados_node_mock(id_num)
144         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
145         self.daemon.max_nodes.get(self.TIMEOUT)
146         self.assertEqual(1, self.node_setup.start.call_count)
147         return self.mock_setup_actor(cloud_node, arv_node)
148
149     def test_no_duplication_when_booting_node_listed_fast(self):
150         # Test that we don't start two ComputeNodeMonitorActors when
151         # we learn about a booting node through a listing before we
152         # get the "node up" message from CloudNodeSetupActor.
153         cloud_node = testutil.cloud_node_mock(1)
154         setup = self.start_node_boot(cloud_node)
155         self.daemon.update_cloud_nodes([cloud_node])
156         self.daemon.node_up(setup).get(self.TIMEOUT)
157         self.assertEqual(1, self.alive_monitor_count())
158
159     def test_no_duplication_when_booted_node_listed(self):
160         cloud_node = testutil.cloud_node_mock(2)
161         setup = self.start_node_boot(cloud_node, id_num=2)
162         self.daemon.node_up(setup)
163         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
164         self.assertEqual(1, self.alive_monitor_count())
165
166     def test_node_counted_after_boot_with_slow_listing(self):
167         # Test that, after we boot a compute node, we assume it exists
168         # even it doesn't appear in the listing (e.g., because of delays
169         # propagating tags).
170         setup = self.start_node_boot()
171         self.daemon.node_up(setup).get(self.TIMEOUT)
172         self.assertEqual(1, self.alive_monitor_count())
173         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
174         self.assertEqual(1, self.alive_monitor_count())
175
176     def test_booted_unlisted_node_counted(self):
177         setup = self.start_node_boot(id_num=1)
178         self.daemon.node_up(setup)
179         self.daemon.update_server_wishlist(
180             [testutil.MockSize(1)]).get(self.TIMEOUT)
181         self.stop_proxy(self.daemon)
182         self.assertFalse(self.node_setup.start.called,
183                          "daemon did not count booted node toward wishlist")
184
185     def test_booted_node_can_shutdown(self):
186         setup = self.start_node_boot()
187         self.daemon.node_up(setup).get(self.TIMEOUT)
188         self.assertEqual(1, self.alive_monitor_count())
189         monitor = self.monitor_list()[0].proxy()
190         self.daemon.update_server_wishlist([])
191         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
192         self.stop_proxy(self.daemon)
193         self.assertTrue(self.node_shutdown.start.called,
194                         "daemon did not shut down booted node on offer")
195
196     def test_booted_node_lifecycle(self):
197         cloud_node = testutil.cloud_node_mock(6)
198         setup = self.start_node_boot(cloud_node, id_num=6)
199         self.daemon.node_up(setup).get(self.TIMEOUT)
200         self.assertEqual(1, self.alive_monitor_count())
201         monitor = self.monitor_list()[0].proxy()
202         self.daemon.update_server_wishlist([])
203         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
204         self.assertShutdownCancellable(True)
205         shutdown = self.node_shutdown.start().proxy()
206         shutdown.cloud_node.get.return_value = cloud_node
207         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
208         self.assertTrue(shutdown.stop.called,
209                         "shutdown actor not stopped after finishing")
210         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
211                         "monitor for booted node not stopped after shutdown")
212         self.daemon.update_server_wishlist(
213             [testutil.MockSize(2)]).get(self.TIMEOUT)
214         self.stop_proxy(self.daemon)
215         self.assertTrue(self.node_setup.start.called,
216                         "second node not started after booted node stopped")
217
218     def test_booted_node_shut_down_when_never_listed(self):
219         setup = self.start_node_boot()
220         self.daemon.node_up(setup).get(self.TIMEOUT)
221         self.assertEqual(1, self.alive_monitor_count())
222         self.assertFalse(self.node_shutdown.start.called)
223         self.timer.deliver()
224         self.stop_proxy(self.daemon)
225         self.assertShutdownCancellable(False)
226
227     def test_booted_node_shut_down_when_never_paired(self):
228         cloud_node = testutil.cloud_node_mock(2)
229         setup = self.start_node_boot(cloud_node)
230         self.daemon.node_up(setup).get(self.TIMEOUT)
231         self.assertEqual(1, self.alive_monitor_count())
232         self.daemon.update_cloud_nodes([cloud_node])
233         self.timer.deliver()
234         self.stop_proxy(self.daemon)
235         self.assertShutdownCancellable(False)
236
237     def test_node_that_pairs_not_considered_failed_boot(self):
238         cloud_node = testutil.cloud_node_mock(3)
239         arv_node = testutil.arvados_node_mock(3)
240         setup = self.start_node_boot(cloud_node, arv_node)
241         self.daemon.node_up(setup).get(self.TIMEOUT)
242         self.assertEqual(1, self.alive_monitor_count())
243         self.daemon.update_cloud_nodes([cloud_node])
244         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
245         self.timer.deliver()
246         self.stop_proxy(self.daemon)
247         self.assertFalse(self.node_shutdown.start.called)
248
249     def test_booting_nodes_shut_down(self):
250         self.make_daemon(want_sizes=[testutil.MockSize(1)])
251         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
252         self.stop_proxy(self.daemon)
253         self.assertTrue(
254             self.node_setup.start().proxy().stop_if_no_cloud_node.called)
255
256     def test_shutdown_declined_at_wishlist_capacity(self):
257         cloud_node = testutil.cloud_node_mock(1)
258         size = testutil.MockSize(1)
259         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
260         self.assertEqual(1, self.alive_monitor_count())
261         monitor = self.monitor_list()[0].proxy()
262         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
263         self.stop_proxy(self.daemon)
264         self.assertFalse(self.node_shutdown.start.called)
265
266     def test_shutdown_declined_below_min_nodes(self):
267         cloud_node = testutil.cloud_node_mock(1)
268         self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
269         self.assertEqual(1, self.alive_monitor_count())
270         monitor = self.monitor_list()[0].proxy()
271         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
272         self.stop_proxy(self.daemon)
273         self.assertFalse(self.node_shutdown.start.called)
274
275     def test_shutdown_accepted_below_capacity(self):
276         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
277         self.assertEqual(1, self.alive_monitor_count())
278         monitor = self.monitor_list()[0].proxy()
279         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
280         self.stop_proxy(self.daemon)
281         self.assertTrue(self.node_shutdown.start.called)
282
283     def test_shutdown_declined_when_idle_and_job_queued(self):
284         cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
285         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
286                      testutil.arvados_node_mock(4, job_uuid=None)]
287         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
288         self.assertEqual(2, self.alive_monitor_count())
289         for mon_ref in self.monitor_list():
290             monitor = mon_ref.proxy()
291             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
292                 break
293         else:
294             self.fail("monitor for idle node not found")
295         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
296         self.stop_proxy(self.daemon)
297         self.assertFalse(self.node_shutdown.start.called)
298
299     def test_node_shutdown_after_cancelled_shutdown(self):
300         cloud_node = testutil.cloud_node_mock(5)
301         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
302         self.assertEqual(1, self.alive_monitor_count())
303         monitor = self.monitor_list()[0].proxy()
304         shutdown_proxy = self.node_shutdown.start().proxy
305         shutdown_proxy().cloud_node.get.return_value = cloud_node
306         shutdown_proxy().success.get.return_value = False
307         shutdown_proxy.reset_mock()
308         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
309         self.assertTrue(shutdown_proxy.called)
310         self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
311         shutdown_proxy().success.get.return_value = True
312         shutdown_proxy.reset_mock()
313         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
314         self.assertTrue(shutdown_proxy.called)
315
316     def test_nodes_shutting_down_replaced_below_max_nodes(self):
317         cloud_node = testutil.cloud_node_mock(6)
318         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
319         self.assertEqual(1, self.alive_monitor_count())
320         monitor = self.monitor_list()[0].proxy()
321         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
322         self.assertTrue(self.node_shutdown.start.called)
323         self.daemon.update_server_wishlist(
324             [testutil.MockSize(6)]).get(self.TIMEOUT)
325         self.stop_proxy(self.daemon)
326         self.assertTrue(self.node_setup.start.called)
327
328     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
329         cloud_node = testutil.cloud_node_mock(7)
330         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
331                          max_nodes=1)
332         self.assertEqual(1, self.alive_monitor_count())
333         monitor = self.monitor_list()[0].proxy()
334         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
335         self.assertTrue(self.node_shutdown.start.called)
336         self.daemon.update_server_wishlist(
337             [testutil.MockSize(7)]).get(self.TIMEOUT)
338         self.stop_proxy(self.daemon)
339         self.assertFalse(self.node_setup.start.called)
340
341     def test_nodes_shutting_down_count_against_excess(self):
342         cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
343         arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
344         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
345         self.assertEqual(2, self.alive_monitor_count())
346         for mon_ref in self.monitor_list():
347             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
348         self.assertEqual(1, self.node_shutdown.start.call_count)
349
350     def test_clean_shutdown_waits_for_node_setup_finish(self):
351         new_node = self.start_node_boot()
352         self.daemon.shutdown().get(self.TIMEOUT)
353         self.assertTrue(new_node.stop_if_no_cloud_node.called)
354         self.daemon.node_up(new_node).get(self.TIMEOUT)
355         self.assertTrue(new_node.stop.called)
356         self.timer.deliver()
357         self.assertTrue(
358             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
359
360     def test_wishlist_ignored_after_shutdown(self):
361         size = testutil.MockSize(2)
362         self.make_daemon(want_sizes=[size])
363         self.daemon.shutdown().get(self.TIMEOUT)
364         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
365         self.timer.deliver()
366         self.stop_proxy(self.daemon)
367         self.assertEqual(1, self.node_setup.start.call_count)