Add 'apps/arv-web/' from commit 'f9732ad8460d013c2f28363655d0d1b91894dca5'
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
14
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
16                                      unittest.TestCase):
17     def new_setup_proxy(self):
18         # Make sure that every time the daemon starts a setup actor,
19         # it gets a new mock object back.
20         self.last_setup = mock.MagicMock(name='setup_proxy_mock')
21         return self.last_setup
22
23     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
24                     min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
25         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
26             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
27         self.arv_factory = mock.MagicMock(name='arvados_mock')
28         self.cloud_factory = mock.MagicMock(name='cloud_mock')
29         self.cloud_factory().node_start_time.return_value = time.time()
30         self.cloud_updates = mock.MagicMock(name='updates_mock')
31         self.timer = testutil.MockTimer(deliver_immediately=False)
32         self.node_setup = mock.MagicMock(name='setup_mock')
33         self.node_setup.start().proxy.side_effect = self.new_setup_proxy
34         self.node_setup.reset_mock()
35         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
36         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
37             self.server_wishlist_poller, self.arvados_nodes_poller,
38             self.cloud_nodes_poller, self.cloud_updates, self.timer,
39             self.arv_factory, self.cloud_factory,
40             [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
41             self.node_setup, self.node_shutdown).proxy()
42         if cloud_nodes is not None:
43             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
44         if arvados_nodes is not None:
45             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
46         if want_sizes is not None:
47             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
48
49     def monitor_list(self):
50         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
51
52     def alive_monitor_count(self):
53         return sum(1 for actor in self.monitor_list() if actor.is_alive())
54
55     def assertShutdownCancellable(self, expected=True):
56         self.assertTrue(self.node_shutdown.start.called)
57         self.assertIs(expected,
58                       self.node_shutdown.start.call_args[1]['cancellable'],
59                       "ComputeNodeShutdownActor incorrectly cancellable")
60
61     def test_easy_node_creation(self):
62         size = testutil.MockSize(1)
63         self.make_daemon(want_sizes=[size])
64         self.stop_proxy(self.daemon)
65         self.assertTrue(self.node_setup.start.called)
66
67     def check_monitors_arvados_nodes(self, *arv_nodes):
68         pairings = [monitor.proxy().arvados_node
69                     for monitor in self.monitor_list() if monitor.is_alive()]
70         self.assertItemsEqual(arv_nodes, pykka.get_all(pairings, self.TIMEOUT))
71
72     def test_node_pairing(self):
73         cloud_node = testutil.cloud_node_mock(1)
74         arv_node = testutil.arvados_node_mock(1)
75         self.make_daemon([cloud_node], [arv_node])
76         self.stop_proxy(self.daemon)
77         self.check_monitors_arvados_nodes(arv_node)
78
79     def test_node_pairing_after_arvados_update(self):
80         cloud_node = testutil.cloud_node_mock(2)
81         self.make_daemon([cloud_node],
82                          [testutil.arvados_node_mock(2, ip_address=None)])
83         arv_node = testutil.arvados_node_mock(2)
84         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
85         self.stop_proxy(self.daemon)
86         self.check_monitors_arvados_nodes(arv_node)
87
88     def test_arvados_node_un_and_re_paired(self):
89         arv_node = testutil.arvados_node_mock(3)
90         self.make_daemon([testutil.cloud_node_mock(3)], [arv_node])
91         self.check_monitors_arvados_nodes(arv_node)
92         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
93         self.assertEqual(0, self.alive_monitor_count())
94         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
95         self.stop_proxy(self.daemon)
96         self.check_monitors_arvados_nodes(arv_node)
97
98     def test_old_arvados_node_not_double_assigned(self):
99         arv_node = testutil.arvados_node_mock(3, age=9000)
100         size = testutil.MockSize(3)
101         self.make_daemon(arvados_nodes=[arv_node])
102         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
103         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
104         self.stop_proxy(self.daemon)
105         used_nodes = [call[1].get('arvados_node')
106                       for call in self.node_setup.start.call_args_list]
107         self.assertEqual(2, len(used_nodes))
108         self.assertIn(arv_node, used_nodes)
109         self.assertIn(None, used_nodes)
110
111     def test_node_count_satisfied(self):
112         self.make_daemon([testutil.cloud_node_mock()],
113                          want_sizes=[testutil.MockSize(1)])
114         self.stop_proxy(self.daemon)
115         self.assertFalse(self.node_setup.called)
116
117     def test_booting_nodes_counted(self):
118         cloud_node = testutil.cloud_node_mock(1)
119         arv_node = testutil.arvados_node_mock(1)
120         server_wishlist = [testutil.MockSize(1)] * 2
121         self.make_daemon([cloud_node], [arv_node], server_wishlist)
122         self.daemon.max_nodes.get(self.TIMEOUT)
123         self.assertTrue(self.node_setup.start.called)
124         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
125         self.stop_proxy(self.daemon)
126         self.assertEqual(1, self.node_setup.start.call_count)
127
128     def test_boot_new_node_when_all_nodes_busy(self):
129         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
130         self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
131                          [testutil.MockSize(2)])
132         self.stop_proxy(self.daemon)
133         self.assertTrue(self.node_setup.start.called)
134
135     def test_boot_new_node_below_min_nodes(self):
136         min_size = testutil.MockSize(1)
137         wish_size = testutil.MockSize(3)
138         self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
139         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
140         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
141         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
142         self.stop_proxy(self.daemon)
143         self.assertEqual([wish_size, min_size],
144                          [call[1].get('cloud_size')
145                           for call in self.node_setup.start.call_args_list])
146
147     def test_no_new_node_when_ge_min_nodes_busy(self):
148         cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
149         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
150                      for n in range(1, 4)]
151         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
152         self.stop_proxy(self.daemon)
153         self.assertEqual(0, self.node_setup.start.call_count)
154
155     def test_no_new_node_when_max_nodes_busy(self):
156         self.make_daemon([testutil.cloud_node_mock(3)],
157                          [testutil.arvados_node_mock(3, job_uuid=True)],
158                          [testutil.MockSize(3)],
159                          max_nodes=1)
160         self.stop_proxy(self.daemon)
161         self.assertFalse(self.node_setup.start.called)
162
163     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
164         if cloud_node is None:
165             cloud_node = testutil.cloud_node_mock(id_num)
166         if arv_node is None:
167             arv_node = testutil.arvados_node_mock(id_num)
168         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
169         self.daemon.max_nodes.get(self.TIMEOUT)
170         self.assertEqual(1, self.node_setup.start.call_count)
171         self.last_setup.cloud_node.get.return_value = cloud_node
172         self.last_setup.arvados_node.get.return_value = arv_node
173         return self.last_setup
174
175     def test_no_duplication_when_booting_node_listed_fast(self):
176         # Test that we don't start two ComputeNodeMonitorActors when
177         # we learn about a booting node through a listing before we
178         # get the "node up" message from CloudNodeSetupActor.
179         cloud_node = testutil.cloud_node_mock(1)
180         setup = self.start_node_boot(cloud_node)
181         self.daemon.update_cloud_nodes([cloud_node])
182         self.daemon.node_up(setup).get(self.TIMEOUT)
183         self.assertEqual(1, self.alive_monitor_count())
184
185     def test_no_duplication_when_booted_node_listed(self):
186         cloud_node = testutil.cloud_node_mock(2)
187         setup = self.start_node_boot(cloud_node, id_num=2)
188         self.daemon.node_up(setup)
189         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
190         self.assertEqual(1, self.alive_monitor_count())
191
192     def test_node_counted_after_boot_with_slow_listing(self):
193         # Test that, after we boot a compute node, we assume it exists
194         # even it doesn't appear in the listing (e.g., because of delays
195         # propagating tags).
196         setup = self.start_node_boot()
197         self.daemon.node_up(setup).get(self.TIMEOUT)
198         self.assertEqual(1, self.alive_monitor_count())
199         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
200         self.assertEqual(1, self.alive_monitor_count())
201
202     def test_booted_unlisted_node_counted(self):
203         setup = self.start_node_boot(id_num=1)
204         self.daemon.node_up(setup)
205         self.daemon.update_server_wishlist(
206             [testutil.MockSize(1)]).get(self.TIMEOUT)
207         self.stop_proxy(self.daemon)
208         self.assertEqual(1, self.node_setup.start.call_count)
209
210     def test_booted_node_can_shutdown(self):
211         setup = self.start_node_boot()
212         self.daemon.node_up(setup).get(self.TIMEOUT)
213         self.assertEqual(1, self.alive_monitor_count())
214         monitor = self.monitor_list()[0].proxy()
215         self.daemon.update_server_wishlist([])
216         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
217         self.stop_proxy(self.daemon)
218         self.assertTrue(self.node_shutdown.start.called,
219                         "daemon did not shut down booted node on offer")
220
221     def test_booted_node_lifecycle(self):
222         cloud_node = testutil.cloud_node_mock(6)
223         setup = self.start_node_boot(cloud_node, id_num=6)
224         self.daemon.node_up(setup).get(self.TIMEOUT)
225         self.assertEqual(1, self.alive_monitor_count())
226         monitor = self.monitor_list()[0].proxy()
227         self.daemon.update_server_wishlist([])
228         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
229         self.assertShutdownCancellable(True)
230         shutdown = self.node_shutdown.start().proxy()
231         shutdown.cloud_node.get.return_value = cloud_node
232         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
233         self.assertTrue(shutdown.stop.called,
234                         "shutdown actor not stopped after finishing")
235         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
236                         "monitor for booted node not stopped after shutdown")
237         self.daemon.update_server_wishlist(
238             [testutil.MockSize(2)]).get(self.TIMEOUT)
239         self.stop_proxy(self.daemon)
240         self.assertTrue(self.node_setup.start.called,
241                         "second node not started after booted node stopped")
242
243     def test_booted_node_shut_down_when_never_listed(self):
244         setup = self.start_node_boot()
245         self.daemon.node_up(setup).get(self.TIMEOUT)
246         self.assertEqual(1, self.alive_monitor_count())
247         self.assertFalse(self.node_shutdown.start.called)
248         self.timer.deliver()
249         self.stop_proxy(self.daemon)
250         self.assertShutdownCancellable(False)
251
252     def test_booted_node_shut_down_when_never_paired(self):
253         cloud_node = testutil.cloud_node_mock(2)
254         setup = self.start_node_boot(cloud_node)
255         self.daemon.node_up(setup).get(self.TIMEOUT)
256         self.assertEqual(1, self.alive_monitor_count())
257         self.daemon.update_cloud_nodes([cloud_node])
258         self.timer.deliver()
259         self.stop_proxy(self.daemon)
260         self.assertShutdownCancellable(False)
261
262     def test_node_that_pairs_not_considered_failed_boot(self):
263         cloud_node = testutil.cloud_node_mock(3)
264         arv_node = testutil.arvados_node_mock(3)
265         setup = self.start_node_boot(cloud_node, arv_node)
266         self.daemon.node_up(setup).get(self.TIMEOUT)
267         self.assertEqual(1, self.alive_monitor_count())
268         self.daemon.update_cloud_nodes([cloud_node])
269         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
270         self.timer.deliver()
271         self.stop_proxy(self.daemon)
272         self.assertFalse(self.node_shutdown.start.called)
273
274     def test_booting_nodes_shut_down(self):
275         self.make_daemon(want_sizes=[testutil.MockSize(1)])
276         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
277         self.stop_proxy(self.daemon)
278         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
279
280     def test_shutdown_declined_at_wishlist_capacity(self):
281         cloud_node = testutil.cloud_node_mock(1)
282         size = testutil.MockSize(1)
283         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
284         self.assertEqual(1, self.alive_monitor_count())
285         monitor = self.monitor_list()[0].proxy()
286         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
287         self.stop_proxy(self.daemon)
288         self.assertFalse(self.node_shutdown.start.called)
289
290     def test_shutdown_declined_below_min_nodes(self):
291         cloud_node = testutil.cloud_node_mock(1)
292         self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
293         self.assertEqual(1, self.alive_monitor_count())
294         monitor = self.monitor_list()[0].proxy()
295         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
296         self.stop_proxy(self.daemon)
297         self.assertFalse(self.node_shutdown.start.called)
298
299     def test_shutdown_accepted_below_capacity(self):
300         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
301         self.assertEqual(1, self.alive_monitor_count())
302         monitor = self.monitor_list()[0].proxy()
303         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
304         self.stop_proxy(self.daemon)
305         self.assertTrue(self.node_shutdown.start.called)
306
307     def test_shutdown_declined_when_idle_and_job_queued(self):
308         cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
309         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
310                      testutil.arvados_node_mock(4, job_uuid=None)]
311         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
312         self.assertEqual(2, self.alive_monitor_count())
313         for mon_ref in self.monitor_list():
314             monitor = mon_ref.proxy()
315             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
316                 break
317         else:
318             self.fail("monitor for idle node not found")
319         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
320         self.stop_proxy(self.daemon)
321         self.assertFalse(self.node_shutdown.start.called)
322
323     def test_node_shutdown_after_cancelled_shutdown(self):
324         cloud_node = testutil.cloud_node_mock(5)
325         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
326         self.assertEqual(1, self.alive_monitor_count())
327         monitor = self.monitor_list()[0].proxy()
328         shutdown_proxy = self.node_shutdown.start().proxy
329         shutdown_proxy().cloud_node.get.return_value = cloud_node
330         shutdown_proxy().success.get.return_value = False
331         shutdown_proxy.reset_mock()
332         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
333         self.assertTrue(shutdown_proxy.called)
334         self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
335         shutdown_proxy().success.get.return_value = True
336         shutdown_proxy.reset_mock()
337         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
338         self.assertTrue(shutdown_proxy.called)
339
340     def test_nodes_shutting_down_replaced_below_max_nodes(self):
341         cloud_node = testutil.cloud_node_mock(6)
342         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
343         self.assertEqual(1, self.alive_monitor_count())
344         monitor = self.monitor_list()[0].proxy()
345         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
346         self.assertTrue(self.node_shutdown.start.called)
347         self.daemon.update_server_wishlist(
348             [testutil.MockSize(6)]).get(self.TIMEOUT)
349         self.stop_proxy(self.daemon)
350         self.assertTrue(self.node_setup.start.called)
351
352     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
353         cloud_node = testutil.cloud_node_mock(7)
354         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
355                          max_nodes=1)
356         self.assertEqual(1, self.alive_monitor_count())
357         monitor = self.monitor_list()[0].proxy()
358         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
359         self.assertTrue(self.node_shutdown.start.called)
360         self.daemon.update_server_wishlist(
361             [testutil.MockSize(7)]).get(self.TIMEOUT)
362         self.stop_proxy(self.daemon)
363         self.assertFalse(self.node_setup.start.called)
364
365     def test_nodes_shutting_down_count_against_excess(self):
366         cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
367         arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
368         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
369         self.assertEqual(2, self.alive_monitor_count())
370         for mon_ref in self.monitor_list():
371             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
372         self.assertEqual(1, self.node_shutdown.start.call_count)
373
374     def test_clean_shutdown_waits_for_node_setup_finish(self):
375         new_node = self.start_node_boot()
376         self.daemon.shutdown().get(self.TIMEOUT)
377         self.assertTrue(new_node.stop_if_no_cloud_node.called)
378         self.daemon.node_up(new_node).get(self.TIMEOUT)
379         self.assertTrue(new_node.stop.called)
380         self.timer.deliver()
381         self.assertTrue(
382             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
383
384     def test_wishlist_ignored_after_shutdown(self):
385         size = testutil.MockSize(2)
386         self.make_daemon(want_sizes=[size])
387         self.daemon.shutdown().get(self.TIMEOUT)
388         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
389         self.timer.deliver()
390         self.stop_proxy(self.daemon)
391         self.assertEqual(1, self.node_setup.start.call_count)