3198: Fix Operation(inode_cache) init. Break up tests into a few smaller
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
14
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
16                                      unittest.TestCase):
17     def new_setup_proxy(self):
18         # Make sure that every time the daemon starts a setup actor,
19         # it gets a new mock object back.
20         self.last_setup = mock.MagicMock(name='setup_proxy_mock')
21         return self.last_setup
22
23     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
24                     min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
25         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
26             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
27         self.arv_factory = mock.MagicMock(name='arvados_mock')
28         self.cloud_factory = mock.MagicMock(name='cloud_mock')
29         self.cloud_factory().node_start_time.return_value = time.time()
30         self.cloud_updates = mock.MagicMock(name='updates_mock')
31         self.timer = testutil.MockTimer(deliver_immediately=False)
32         self.node_setup = mock.MagicMock(name='setup_mock')
33         self.node_setup.start().proxy.side_effect = self.new_setup_proxy
34         self.node_setup.reset_mock()
35         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
36         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
37             self.server_wishlist_poller, self.arvados_nodes_poller,
38             self.cloud_nodes_poller, self.cloud_updates, self.timer,
39             self.arv_factory, self.cloud_factory,
40             [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
41             self.node_setup, self.node_shutdown).proxy()
42         if cloud_nodes is not None:
43             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
44         if arvados_nodes is not None:
45             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
46         if want_sizes is not None:
47             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
48
49     def monitor_list(self):
50         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
51
52     def monitored_arvados_nodes(self):
53         pairings = []
54         for future in [actor.proxy().arvados_node
55                        for actor in self.monitor_list()]:
56             try:
57                 pairings.append(future.get(self.TIMEOUT))
58             except pykka.ActorDeadError:
59                 pass
60         return pairings
61
62     def alive_monitor_count(self):
63         return len(self.monitored_arvados_nodes())
64
65     def assertShutdownCancellable(self, expected=True):
66         self.assertTrue(self.node_shutdown.start.called)
67         self.assertIs(expected,
68                       self.node_shutdown.start.call_args[1]['cancellable'],
69                       "ComputeNodeShutdownActor incorrectly cancellable")
70
71     def test_easy_node_creation(self):
72         size = testutil.MockSize(1)
73         self.make_daemon(want_sizes=[size])
74         self.stop_proxy(self.daemon)
75         self.assertTrue(self.node_setup.start.called)
76
77     def check_monitors_arvados_nodes(self, *arv_nodes):
78         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
79
80     def test_node_pairing(self):
81         cloud_node = testutil.cloud_node_mock(1)
82         arv_node = testutil.arvados_node_mock(1)
83         self.make_daemon([cloud_node], [arv_node])
84         self.stop_proxy(self.daemon)
85         self.check_monitors_arvados_nodes(arv_node)
86
87     def test_node_pairing_after_arvados_update(self):
88         cloud_node = testutil.cloud_node_mock(2)
89         self.make_daemon([cloud_node],
90                          [testutil.arvados_node_mock(2, ip_address=None)])
91         arv_node = testutil.arvados_node_mock(2)
92         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
93         self.stop_proxy(self.daemon)
94         self.check_monitors_arvados_nodes(arv_node)
95
96     def test_arvados_node_un_and_re_paired(self):
97         # We need to create the Arvados node mock after spinning up the daemon
98         # to make sure it's new enough to pair with the cloud node.
99         self.make_daemon([testutil.cloud_node_mock(3)], arvados_nodes=None)
100         arv_node = testutil.arvados_node_mock(3)
101         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
102         self.check_monitors_arvados_nodes(arv_node)
103         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
104         self.assertEqual(0, self.alive_monitor_count())
105         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
106         self.stop_proxy(self.daemon)
107         self.check_monitors_arvados_nodes(arv_node)
108
109     def test_old_arvados_node_not_double_assigned(self):
110         arv_node = testutil.arvados_node_mock(3, age=9000)
111         size = testutil.MockSize(3)
112         self.make_daemon(arvados_nodes=[arv_node])
113         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
114         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
115         self.stop_proxy(self.daemon)
116         used_nodes = [call[1].get('arvados_node')
117                       for call in self.node_setup.start.call_args_list]
118         self.assertEqual(2, len(used_nodes))
119         self.assertIn(arv_node, used_nodes)
120         self.assertIn(None, used_nodes)
121
122     def test_node_count_satisfied(self):
123         self.make_daemon([testutil.cloud_node_mock()],
124                          want_sizes=[testutil.MockSize(1)])
125         self.stop_proxy(self.daemon)
126         self.assertFalse(self.node_setup.called)
127
128     def test_booting_nodes_counted(self):
129         cloud_node = testutil.cloud_node_mock(1)
130         arv_node = testutil.arvados_node_mock(1)
131         server_wishlist = [testutil.MockSize(1)] * 2
132         self.make_daemon([cloud_node], [arv_node], server_wishlist)
133         self.daemon.max_nodes.get(self.TIMEOUT)
134         self.assertTrue(self.node_setup.start.called)
135         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
136         self.stop_proxy(self.daemon)
137         self.assertEqual(1, self.node_setup.start.call_count)
138
139     def test_boot_new_node_when_all_nodes_busy(self):
140         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
141         self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
142                          [testutil.MockSize(2)])
143         self.stop_proxy(self.daemon)
144         self.assertTrue(self.node_setup.start.called)
145
146     def test_boot_new_node_below_min_nodes(self):
147         min_size = testutil.MockSize(1)
148         wish_size = testutil.MockSize(3)
149         self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
150         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
151         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
152         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
153         self.stop_proxy(self.daemon)
154         self.assertEqual([wish_size, min_size],
155                          [call[1].get('cloud_size')
156                           for call in self.node_setup.start.call_args_list])
157
158     def test_no_new_node_when_ge_min_nodes_busy(self):
159         cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
160         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
161                      for n in range(1, 4)]
162         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
163         self.stop_proxy(self.daemon)
164         self.assertEqual(0, self.node_setup.start.call_count)
165
166     def test_no_new_node_when_max_nodes_busy(self):
167         self.make_daemon([testutil.cloud_node_mock(3)],
168                          [testutil.arvados_node_mock(3, job_uuid=True)],
169                          [testutil.MockSize(3)],
170                          max_nodes=1)
171         self.stop_proxy(self.daemon)
172         self.assertFalse(self.node_setup.start.called)
173
174     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
175         if cloud_node is None:
176             cloud_node = testutil.cloud_node_mock(id_num)
177         if arv_node is None:
178             arv_node = testutil.arvados_node_mock(id_num)
179         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
180         self.daemon.max_nodes.get(self.TIMEOUT)
181         self.assertEqual(1, self.node_setup.start.call_count)
182         self.last_setup.cloud_node.get.return_value = cloud_node
183         self.last_setup.arvados_node.get.return_value = arv_node
184         return self.last_setup
185
186     def test_no_duplication_when_booting_node_listed_fast(self):
187         # Test that we don't start two ComputeNodeMonitorActors when
188         # we learn about a booting node through a listing before we
189         # get the "node up" message from CloudNodeSetupActor.
190         cloud_node = testutil.cloud_node_mock(1)
191         setup = self.start_node_boot(cloud_node)
192         self.daemon.update_cloud_nodes([cloud_node])
193         self.daemon.node_up(setup).get(self.TIMEOUT)
194         self.assertEqual(1, self.alive_monitor_count())
195
196     def test_no_duplication_when_booted_node_listed(self):
197         cloud_node = testutil.cloud_node_mock(2)
198         setup = self.start_node_boot(cloud_node, id_num=2)
199         self.daemon.node_up(setup)
200         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
201         self.assertEqual(1, self.alive_monitor_count())
202
203     def test_node_counted_after_boot_with_slow_listing(self):
204         # Test that, after we boot a compute node, we assume it exists
205         # even it doesn't appear in the listing (e.g., because of delays
206         # propagating tags).
207         setup = self.start_node_boot()
208         self.daemon.node_up(setup).get(self.TIMEOUT)
209         self.assertEqual(1, self.alive_monitor_count())
210         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
211         self.assertEqual(1, self.alive_monitor_count())
212
213     def test_booted_unlisted_node_counted(self):
214         setup = self.start_node_boot(id_num=1)
215         self.daemon.node_up(setup)
216         self.daemon.update_server_wishlist(
217             [testutil.MockSize(1)]).get(self.TIMEOUT)
218         self.stop_proxy(self.daemon)
219         self.assertEqual(1, self.node_setup.start.call_count)
220
221     def test_booted_node_can_shutdown(self):
222         setup = self.start_node_boot()
223         self.daemon.node_up(setup).get(self.TIMEOUT)
224         self.assertEqual(1, self.alive_monitor_count())
225         monitor = self.monitor_list()[0].proxy()
226         self.daemon.update_server_wishlist([])
227         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
228         self.stop_proxy(self.daemon)
229         self.assertTrue(self.node_shutdown.start.called,
230                         "daemon did not shut down booted node on offer")
231
232     def test_booted_node_lifecycle(self):
233         cloud_node = testutil.cloud_node_mock(6)
234         setup = self.start_node_boot(cloud_node, id_num=6)
235         self.daemon.node_up(setup).get(self.TIMEOUT)
236         self.assertEqual(1, self.alive_monitor_count())
237         monitor = self.monitor_list()[0].proxy()
238         self.daemon.update_server_wishlist([])
239         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
240         self.assertShutdownCancellable(True)
241         shutdown = self.node_shutdown.start().proxy()
242         shutdown.cloud_node.get.return_value = cloud_node
243         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
244         self.assertTrue(shutdown.stop.called,
245                         "shutdown actor not stopped after finishing")
246         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
247                         "monitor for booted node not stopped after shutdown")
248         self.daemon.update_server_wishlist(
249             [testutil.MockSize(2)]).get(self.TIMEOUT)
250         self.stop_proxy(self.daemon)
251         self.assertTrue(self.node_setup.start.called,
252                         "second node not started after booted node stopped")
253
254     def test_booted_node_shut_down_when_never_listed(self):
255         setup = self.start_node_boot()
256         self.daemon.node_up(setup).get(self.TIMEOUT)
257         self.assertEqual(1, self.alive_monitor_count())
258         self.assertFalse(self.node_shutdown.start.called)
259         self.timer.deliver()
260         self.stop_proxy(self.daemon)
261         self.assertShutdownCancellable(False)
262
263     def test_booted_node_shut_down_when_never_paired(self):
264         cloud_node = testutil.cloud_node_mock(2)
265         setup = self.start_node_boot(cloud_node)
266         self.daemon.node_up(setup).get(self.TIMEOUT)
267         self.assertEqual(1, self.alive_monitor_count())
268         self.daemon.update_cloud_nodes([cloud_node])
269         self.timer.deliver()
270         self.stop_proxy(self.daemon)
271         self.assertShutdownCancellable(False)
272
273     def test_node_that_pairs_not_considered_failed_boot(self):
274         cloud_node = testutil.cloud_node_mock(3)
275         arv_node = testutil.arvados_node_mock(3)
276         setup = self.start_node_boot(cloud_node, arv_node)
277         self.daemon.node_up(setup).get(self.TIMEOUT)
278         self.assertEqual(1, self.alive_monitor_count())
279         self.daemon.update_cloud_nodes([cloud_node])
280         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
281         self.timer.deliver()
282         self.stop_proxy(self.daemon)
283         self.assertFalse(self.node_shutdown.start.called)
284
285     def test_booting_nodes_shut_down(self):
286         self.make_daemon(want_sizes=[testutil.MockSize(1)])
287         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
288         self.stop_proxy(self.daemon)
289         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
290
291     def test_shutdown_declined_at_wishlist_capacity(self):
292         cloud_node = testutil.cloud_node_mock(1)
293         size = testutil.MockSize(1)
294         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
295         self.assertEqual(1, self.alive_monitor_count())
296         monitor = self.monitor_list()[0].proxy()
297         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
298         self.stop_proxy(self.daemon)
299         self.assertFalse(self.node_shutdown.start.called)
300
301     def test_shutdown_declined_below_min_nodes(self):
302         cloud_node = testutil.cloud_node_mock(1)
303         self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
304         self.assertEqual(1, self.alive_monitor_count())
305         monitor = self.monitor_list()[0].proxy()
306         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
307         self.stop_proxy(self.daemon)
308         self.assertFalse(self.node_shutdown.start.called)
309
310     def test_shutdown_accepted_below_capacity(self):
311         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
312         self.assertEqual(1, self.alive_monitor_count())
313         monitor = self.monitor_list()[0].proxy()
314         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
315         self.stop_proxy(self.daemon)
316         self.assertTrue(self.node_shutdown.start.called)
317
318     def test_shutdown_declined_when_idle_and_job_queued(self):
319         cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
320         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
321                      testutil.arvados_node_mock(4, job_uuid=None)]
322         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
323         self.assertEqual(2, self.alive_monitor_count())
324         for mon_ref in self.monitor_list():
325             monitor = mon_ref.proxy()
326             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
327                 break
328         else:
329             self.fail("monitor for idle node not found")
330         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
331         self.stop_proxy(self.daemon)
332         self.assertFalse(self.node_shutdown.start.called)
333
334     def test_node_shutdown_after_cancelled_shutdown(self):
335         cloud_node = testutil.cloud_node_mock(5)
336         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
337         self.assertEqual(1, self.alive_monitor_count())
338         monitor = self.monitor_list()[0].proxy()
339         shutdown_proxy = self.node_shutdown.start().proxy
340         shutdown_proxy().cloud_node.get.return_value = cloud_node
341         shutdown_proxy().success.get.return_value = False
342         shutdown_proxy.reset_mock()
343         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
344         self.assertTrue(shutdown_proxy.called)
345         self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
346         shutdown_proxy().success.get.return_value = True
347         shutdown_proxy.reset_mock()
348         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
349         self.assertTrue(shutdown_proxy.called)
350
351     def test_nodes_shutting_down_replaced_below_max_nodes(self):
352         cloud_node = testutil.cloud_node_mock(6)
353         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
354         self.assertEqual(1, self.alive_monitor_count())
355         monitor = self.monitor_list()[0].proxy()
356         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
357         self.assertTrue(self.node_shutdown.start.called)
358         self.daemon.update_server_wishlist(
359             [testutil.MockSize(6)]).get(self.TIMEOUT)
360         self.stop_proxy(self.daemon)
361         self.assertTrue(self.node_setup.start.called)
362
363     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
364         cloud_node = testutil.cloud_node_mock(7)
365         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
366                          max_nodes=1)
367         self.assertEqual(1, self.alive_monitor_count())
368         monitor = self.monitor_list()[0].proxy()
369         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
370         self.assertTrue(self.node_shutdown.start.called)
371         self.daemon.update_server_wishlist(
372             [testutil.MockSize(7)]).get(self.TIMEOUT)
373         self.stop_proxy(self.daemon)
374         self.assertFalse(self.node_setup.start.called)
375
376     def test_nodes_shutting_down_count_against_excess(self):
377         cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
378         arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
379         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
380         self.assertEqual(2, self.alive_monitor_count())
381         for mon_ref in self.monitor_list():
382             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
383         self.assertEqual(1, self.node_shutdown.start.call_count)
384
385     def test_clean_shutdown_waits_for_node_setup_finish(self):
386         new_node = self.start_node_boot()
387         self.daemon.shutdown().get(self.TIMEOUT)
388         self.assertTrue(new_node.stop_if_no_cloud_node.called)
389         self.daemon.node_up(new_node).get(self.TIMEOUT)
390         self.assertTrue(new_node.stop.called)
391         self.timer.deliver()
392         self.assertTrue(
393             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
394
395     def test_wishlist_ignored_after_shutdown(self):
396         size = testutil.MockSize(2)
397         self.make_daemon(want_sizes=[size])
398         self.daemon.shutdown().get(self.TIMEOUT)
399         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
400         self.timer.deliver()
401         self.stop_proxy(self.daemon)
402         self.assertEqual(1, self.node_setup.start.call_count)