Merge branch 'master' into 6507-node-manager-azure
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
14
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
16                                      unittest.TestCase):
17     def new_setup_proxy(self):
18         # Make sure that every time the daemon starts a setup actor,
19         # it gets a new mock object back.
20         self.last_setup = mock.MagicMock(name='setup_proxy_mock')
21         return self.last_setup
22
23     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
24                     min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
25         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
26             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
27         self.arv_factory = mock.MagicMock(name='arvados_mock')
28         self.cloud_factory = mock.MagicMock(name='cloud_mock')
29         self.cloud_factory().node_start_time.return_value = time.time()
30         self.cloud_updates = mock.MagicMock(name='updates_mock')
31         self.timer = testutil.MockTimer(deliver_immediately=False)
32         self.node_setup = mock.MagicMock(name='setup_mock')
33         self.node_setup.start().proxy.side_effect = self.new_setup_proxy
34         self.node_setup.reset_mock()
35         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
36         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
37             self.server_wishlist_poller, self.arvados_nodes_poller,
38             self.cloud_nodes_poller, self.cloud_updates, self.timer,
39             self.arv_factory, self.cloud_factory,
40             [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
41             self.node_setup, self.node_shutdown).proxy()
42         if cloud_nodes is not None:
43             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
44         if arvados_nodes is not None:
45             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
46         if want_sizes is not None:
47             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
48
49     def monitor_list(self):
50         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
51
52     def monitored_arvados_nodes(self):
53         pairings = []
54         for future in [actor.proxy().arvados_node
55                        for actor in self.monitor_list()]:
56             try:
57                 pairings.append(future.get(self.TIMEOUT))
58             except pykka.ActorDeadError:
59                 pass
60         return pairings
61
62     def alive_monitor_count(self):
63         return len(self.monitored_arvados_nodes())
64
65     def assertShutdownCancellable(self, expected=True):
66         self.assertTrue(self.node_shutdown.start.called)
67         self.assertIs(expected,
68                       self.node_shutdown.start.call_args[1]['cancellable'],
69                       "ComputeNodeShutdownActor incorrectly cancellable")
70
71     def test_easy_node_creation(self):
72         size = testutil.MockSize(1)
73         self.make_daemon(want_sizes=[size])
74         self.stop_proxy(self.daemon)
75         self.assertTrue(self.node_setup.start.called)
76
77     def check_monitors_arvados_nodes(self, *arv_nodes):
78         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
79
80     def test_node_pairing(self):
81         cloud_node = testutil.cloud_node_mock(1)
82         arv_node = testutil.arvados_node_mock(1)
83         self.make_daemon([cloud_node], [arv_node])
84         self.stop_proxy(self.daemon)
85         self.check_monitors_arvados_nodes(arv_node)
86
87     def test_node_pairing_after_arvados_update(self):
88         cloud_node = testutil.cloud_node_mock(2)
89         self.make_daemon([cloud_node],
90                          [testutil.arvados_node_mock(2, ip_address=None)])
91         arv_node = testutil.arvados_node_mock(2)
92         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
93         self.stop_proxy(self.daemon)
94         self.check_monitors_arvados_nodes(arv_node)
95
96     def test_arvados_node_un_and_re_paired(self):
97         # We need to create the Arvados node mock after spinning up the daemon
98         # to make sure it's new enough to pair with the cloud node.
99         self.make_daemon([testutil.cloud_node_mock(3)], arvados_nodes=None)
100         arv_node = testutil.arvados_node_mock(3)
101         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
102         self.check_monitors_arvados_nodes(arv_node)
103         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
104         self.assertEqual(0, self.alive_monitor_count())
105         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
106         self.stop_proxy(self.daemon)
107         self.check_monitors_arvados_nodes(arv_node)
108
109     def test_old_arvados_node_not_double_assigned(self):
110         arv_node = testutil.arvados_node_mock(3, age=9000)
111         size = testutil.MockSize(3)
112         self.make_daemon(arvados_nodes=[arv_node])
113         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
114         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
115         self.stop_proxy(self.daemon)
116         used_nodes = [call[1].get('arvados_node')
117                       for call in self.node_setup.start.call_args_list]
118         self.assertEqual(2, len(used_nodes))
119         self.assertIn(arv_node, used_nodes)
120         self.assertIn(None, used_nodes)
121
122     def test_node_count_satisfied(self):
123         self.make_daemon([testutil.cloud_node_mock()],
124                          want_sizes=[testutil.MockSize(1)])
125         self.stop_proxy(self.daemon)
126         self.assertFalse(self.node_setup.called)
127
128     def test_booting_nodes_counted(self):
129         cloud_node = testutil.cloud_node_mock(1)
130         arv_node = testutil.arvados_node_mock(1)
131         server_wishlist = [testutil.MockSize(1)] * 2
132         self.make_daemon([cloud_node], [arv_node], server_wishlist)
133         self.daemon.max_nodes.get(self.TIMEOUT)
134         self.assertTrue(self.node_setup.start.called)
135         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
136         self.stop_proxy(self.daemon)
137         self.assertEqual(1, self.node_setup.start.call_count)
138
139     def test_boot_new_node_when_all_nodes_busy(self):
140         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
141         self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
142                          [testutil.MockSize(2)])
143         self.stop_proxy(self.daemon)
144         self.assertTrue(self.node_setup.start.called)
145
146     def test_boot_new_node_below_min_nodes(self):
147         min_size = testutil.MockSize(1)
148         wish_size = testutil.MockSize(3)
149         self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
150         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
151         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
152         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
153         self.stop_proxy(self.daemon)
154         self.assertEqual([wish_size, min_size],
155                          [call[1].get('cloud_size')
156                           for call in self.node_setup.start.call_args_list])
157
158     def test_no_new_node_when_ge_min_nodes_busy(self):
159         cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
160         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
161                      for n in range(1, 4)]
162         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
163         self.stop_proxy(self.daemon)
164         self.assertEqual(0, self.node_setup.start.call_count)
165
166     def test_no_new_node_when_max_nodes_busy(self):
167         self.make_daemon([testutil.cloud_node_mock(3)],
168                          [testutil.arvados_node_mock(3, job_uuid=True)],
169                          [testutil.MockSize(3)],
170                          max_nodes=1)
171         self.stop_proxy(self.daemon)
172         self.assertFalse(self.node_setup.start.called)
173
174     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
175         if cloud_node is None:
176             cloud_node = testutil.cloud_node_mock(id_num)
177         if arv_node is None:
178             arv_node = testutil.arvados_node_mock(id_num)
179         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
180         self.daemon.max_nodes.get(self.TIMEOUT)
181         self.assertEqual(1, self.node_setup.start.call_count)
182         self.last_setup.cloud_node.get.return_value = cloud_node
183         self.last_setup.arvados_node.get.return_value = arv_node
184         return self.last_setup
185
186     def test_no_new_node_when_booted_node_not_usable(self):
187         cloud_node = testutil.cloud_node_mock(4)
188         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
189         setup = self.start_node_boot(cloud_node, arv_node)
190         self.daemon.node_up(setup).get(self.TIMEOUT)
191         self.assertEqual(1, self.alive_monitor_count())
192         self.daemon.update_cloud_nodes([cloud_node])
193         self.daemon.update_arvados_nodes([arv_node])
194         self.daemon.update_server_wishlist(
195             [testutil.MockSize(1)]).get(self.TIMEOUT)
196         self.stop_proxy(self.daemon)
197         self.assertEqual(1, self.node_setup.start.call_count)
198
199     def test_no_duplication_when_booting_node_listed_fast(self):
200         # Test that we don't start two ComputeNodeMonitorActors when
201         # we learn about a booting node through a listing before we
202         # get the "node up" message from CloudNodeSetupActor.
203         cloud_node = testutil.cloud_node_mock(1)
204         setup = self.start_node_boot(cloud_node)
205         self.daemon.update_cloud_nodes([cloud_node])
206         self.daemon.node_up(setup).get(self.TIMEOUT)
207         self.assertEqual(1, self.alive_monitor_count())
208
209     def test_no_duplication_when_booted_node_listed(self):
210         cloud_node = testutil.cloud_node_mock(2)
211         setup = self.start_node_boot(cloud_node, id_num=2)
212         self.daemon.node_up(setup)
213         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
214         self.assertEqual(1, self.alive_monitor_count())
215
216     def test_node_counted_after_boot_with_slow_listing(self):
217         # Test that, after we boot a compute node, we assume it exists
218         # even it doesn't appear in the listing (e.g., because of delays
219         # propagating tags).
220         setup = self.start_node_boot()
221         self.daemon.node_up(setup).get(self.TIMEOUT)
222         self.assertEqual(1, self.alive_monitor_count())
223         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
224         self.assertEqual(1, self.alive_monitor_count())
225
226     def test_booted_unlisted_node_counted(self):
227         setup = self.start_node_boot(id_num=1)
228         self.daemon.node_up(setup)
229         self.daemon.update_server_wishlist(
230             [testutil.MockSize(1)]).get(self.TIMEOUT)
231         self.stop_proxy(self.daemon)
232         self.assertEqual(1, self.node_setup.start.call_count)
233
234     def test_booted_node_can_shutdown(self):
235         setup = self.start_node_boot()
236         self.daemon.node_up(setup).get(self.TIMEOUT)
237         self.assertEqual(1, self.alive_monitor_count())
238         monitor = self.monitor_list()[0].proxy()
239         self.daemon.update_server_wishlist([])
240         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
241         self.stop_proxy(self.daemon)
242         self.assertTrue(self.node_shutdown.start.called,
243                         "daemon did not shut down booted node on offer")
244
245     def test_booted_node_lifecycle(self):
246         cloud_node = testutil.cloud_node_mock(6)
247         setup = self.start_node_boot(cloud_node, id_num=6)
248         self.daemon.node_up(setup).get(self.TIMEOUT)
249         self.assertEqual(1, self.alive_monitor_count())
250         monitor = self.monitor_list()[0].proxy()
251         self.daemon.update_server_wishlist([])
252         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
253         self.assertShutdownCancellable(True)
254         shutdown = self.node_shutdown.start().proxy()
255         shutdown.cloud_node.get.return_value = cloud_node
256         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
257         self.assertTrue(shutdown.stop.called,
258                         "shutdown actor not stopped after finishing")
259         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
260                         "monitor for booted node not stopped after shutdown")
261         self.daemon.update_server_wishlist(
262             [testutil.MockSize(2)]).get(self.TIMEOUT)
263         self.stop_proxy(self.daemon)
264         self.assertTrue(self.node_setup.start.called,
265                         "second node not started after booted node stopped")
266
267     def test_booted_node_shut_down_when_never_listed(self):
268         setup = self.start_node_boot()
269         self.daemon.node_up(setup).get(self.TIMEOUT)
270         self.assertEqual(1, self.alive_monitor_count())
271         self.assertFalse(self.node_shutdown.start.called)
272         self.timer.deliver()
273         self.stop_proxy(self.daemon)
274         self.assertShutdownCancellable(False)
275
276     def test_booted_node_shut_down_when_never_paired(self):
277         cloud_node = testutil.cloud_node_mock(2)
278         setup = self.start_node_boot(cloud_node)
279         self.daemon.node_up(setup).get(self.TIMEOUT)
280         self.assertEqual(1, self.alive_monitor_count())
281         self.daemon.update_cloud_nodes([cloud_node])
282         self.timer.deliver()
283         self.stop_proxy(self.daemon)
284         self.assertShutdownCancellable(False)
285
286     def test_booted_node_shut_down_when_never_working(self):
287         cloud_node = testutil.cloud_node_mock(4)
288         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
289         setup = self.start_node_boot(cloud_node, arv_node)
290         self.daemon.node_up(setup).get(self.TIMEOUT)
291         self.assertEqual(1, self.alive_monitor_count())
292         self.daemon.update_cloud_nodes([cloud_node])
293         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
294         self.timer.deliver()
295         self.stop_proxy(self.daemon)
296         self.assertShutdownCancellable(False)
297
298     def test_node_that_pairs_not_considered_failed_boot(self):
299         cloud_node = testutil.cloud_node_mock(3)
300         arv_node = testutil.arvados_node_mock(3)
301         setup = self.start_node_boot(cloud_node, arv_node)
302         self.daemon.node_up(setup).get(self.TIMEOUT)
303         self.assertEqual(1, self.alive_monitor_count())
304         self.daemon.update_cloud_nodes([cloud_node])
305         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
306         self.timer.deliver()
307         self.stop_proxy(self.daemon)
308         self.assertFalse(self.node_shutdown.start.called)
309
310     def test_node_that_pairs_busy_not_considered_failed_boot(self):
311         cloud_node = testutil.cloud_node_mock(5)
312         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
313         setup = self.start_node_boot(cloud_node, arv_node)
314         self.daemon.node_up(setup).get(self.TIMEOUT)
315         self.assertEqual(1, self.alive_monitor_count())
316         self.daemon.update_cloud_nodes([cloud_node])
317         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
318         self.timer.deliver()
319         self.stop_proxy(self.daemon)
320         self.assertFalse(self.node_shutdown.start.called)
321
322     def test_booting_nodes_shut_down(self):
323         self.make_daemon(want_sizes=[testutil.MockSize(1)])
324         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
325         self.stop_proxy(self.daemon)
326         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
327
328     def test_all_booting_nodes_tried_to_shut_down(self):
329         size = testutil.MockSize(2)
330         self.make_daemon(want_sizes=[size])
331         self.daemon.max_nodes.get(self.TIMEOUT)
332         setup1 = self.last_setup
333         setup1.stop_if_no_cloud_node().get.return_value = False
334         setup1.stop_if_no_cloud_node.reset_mock()
335         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
336         self.daemon.max_nodes.get(self.TIMEOUT)
337         self.assertIsNot(setup1, self.last_setup)
338         self.last_setup.stop_if_no_cloud_node().get.return_value = True
339         self.last_setup.stop_if_no_cloud_node.reset_mock()
340         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
341         self.daemon.max_nodes.get(self.TIMEOUT)
342         self.stop_proxy(self.daemon)
343         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
344         self.assertTrue(setup1.stop_if_no_cloud_node.called)
345
346     def test_shutdown_declined_at_wishlist_capacity(self):
347         cloud_node = testutil.cloud_node_mock(1)
348         size = testutil.MockSize(1)
349         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
350         self.assertEqual(1, self.alive_monitor_count())
351         monitor = self.monitor_list()[0].proxy()
352         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
353         self.stop_proxy(self.daemon)
354         self.assertFalse(self.node_shutdown.start.called)
355
356     def test_shutdown_declined_below_min_nodes(self):
357         cloud_node = testutil.cloud_node_mock(1)
358         self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
359         self.assertEqual(1, self.alive_monitor_count())
360         monitor = self.monitor_list()[0].proxy()
361         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
362         self.stop_proxy(self.daemon)
363         self.assertFalse(self.node_shutdown.start.called)
364
365     def test_shutdown_accepted_below_capacity(self):
366         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
367         self.assertEqual(1, self.alive_monitor_count())
368         monitor = self.monitor_list()[0].proxy()
369         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
370         self.stop_proxy(self.daemon)
371         self.assertTrue(self.node_shutdown.start.called)
372
373     def test_shutdown_declined_when_idle_and_job_queued(self):
374         cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
375         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
376                      testutil.arvados_node_mock(4, job_uuid=None)]
377         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
378         self.assertEqual(2, self.alive_monitor_count())
379         for mon_ref in self.monitor_list():
380             monitor = mon_ref.proxy()
381             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
382                 break
383         else:
384             self.fail("monitor for idle node not found")
385         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
386         self.stop_proxy(self.daemon)
387         self.assertFalse(self.node_shutdown.start.called)
388
389     def test_node_shutdown_after_cancelled_shutdown(self):
390         cloud_node = testutil.cloud_node_mock(5)
391         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
392         self.assertEqual(1, self.alive_monitor_count())
393         monitor = self.monitor_list()[0].proxy()
394         shutdown_proxy = self.node_shutdown.start().proxy
395         shutdown_proxy().cloud_node.get.return_value = cloud_node
396         shutdown_proxy().success.get.return_value = False
397         shutdown_proxy.reset_mock()
398         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
399         self.assertTrue(shutdown_proxy.called)
400         self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
401         shutdown_proxy().success.get.return_value = True
402         shutdown_proxy.reset_mock()
403         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
404         self.assertTrue(shutdown_proxy.called)
405
406     def test_nodes_shutting_down_replaced_below_max_nodes(self):
407         cloud_node = testutil.cloud_node_mock(6)
408         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
409         self.assertEqual(1, self.alive_monitor_count())
410         monitor = self.monitor_list()[0].proxy()
411         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
412         self.assertTrue(self.node_shutdown.start.called)
413         self.daemon.update_server_wishlist(
414             [testutil.MockSize(6)]).get(self.TIMEOUT)
415         self.stop_proxy(self.daemon)
416         self.assertTrue(self.node_setup.start.called)
417
418     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
419         cloud_node = testutil.cloud_node_mock(7)
420         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
421                          max_nodes=1)
422         self.assertEqual(1, self.alive_monitor_count())
423         monitor = self.monitor_list()[0].proxy()
424         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
425         self.assertTrue(self.node_shutdown.start.called)
426         self.daemon.update_server_wishlist(
427             [testutil.MockSize(7)]).get(self.TIMEOUT)
428         self.stop_proxy(self.daemon)
429         self.assertFalse(self.node_setup.start.called)
430
431     def test_nodes_shutting_down_count_against_excess(self):
432         cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
433         arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
434         self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
435         self.assertEqual(2, self.alive_monitor_count())
436         for mon_ref in self.monitor_list():
437             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
438         self.assertEqual(1, self.node_shutdown.start.call_count)
439
440     def test_clean_shutdown_waits_for_node_setup_finish(self):
441         new_node = self.start_node_boot()
442         new_node.stop_if_no_cloud_node().get.return_value = False
443         new_node.stop_if_no_cloud_node.reset_mock()
444         self.daemon.shutdown().get(self.TIMEOUT)
445         self.assertTrue(new_node.stop_if_no_cloud_node.called)
446         self.daemon.node_up(new_node).get(self.TIMEOUT)
447         self.assertTrue(new_node.stop.called)
448         self.timer.deliver()
449         self.assertTrue(
450             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
451
452     def test_wishlist_ignored_after_shutdown(self):
453         new_node = self.start_node_boot()
454         new_node.stop_if_no_cloud_node().get.return_value = False
455         new_node.stop_if_no_cloud_node.reset_mock()
456         self.daemon.shutdown().get(self.TIMEOUT)
457         size = testutil.MockSize(2)
458         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
459         self.timer.deliver()
460         self.stop_proxy(self.daemon)
461         self.assertEqual(1, self.node_setup.start.call_count)