12341: Ensure shutdown_actor.stop() is called on node_finished_shutdown.
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24
25     def busywait(self, f):
26         n = 0
27         while not f() and n < 200:
28             time.sleep(.1)
29             self.daemon.ping().get(self.TIMEOUT)
30             n += 1
31         self.assertTrue(f())
32
33     def mock_node_start(self, **kwargs):
34         # Make sure that every time the daemon starts a setup actor,
35         # it gets a new mock object back.
36         get_cloud_size = mock.MagicMock()
37         get_cloud_size.get.return_value = kwargs["cloud_size"]
38         mock_actor = mock.MagicMock()
39         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
40                                           cloud_size=get_cloud_size,
41                                           actor_ref=mock_actor)
42         mock_actor.proxy.return_value = mock_proxy
43         mock_actor.tell_proxy.return_value = mock_proxy
44
45         self.last_setup = mock_proxy
46         return mock_actor
47
48     def mock_node_shutdown(self, **kwargs):
49         # Make sure that every time the daemon starts a shutdown actor,
50         # it gets a new mock object back.
51         get_cloud_node = mock.MagicMock()
52         if "node_monitor" in kwargs:
53             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
54         mock_actor = mock.MagicMock()
55         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
56                                           cloud_node=get_cloud_node,
57                                           actor_ref=mock_actor)
58
59         mock_actor.proxy.return_value = mock_proxy
60         self.last_shutdown = mock_proxy
61
62         return mock_actor
63
64     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
65                     avail_sizes=None,
66                     min_nodes=0, max_nodes=8,
67                     shutdown_windows=[54, 5, 1],
68                     max_total_price=None):
69         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
70             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
71
72         if not avail_sizes:
73             if cloud_nodes or want_sizes:
74                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
75             else:
76                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
77
78         self.arv_factory = mock.MagicMock(name='arvados_mock')
79         api_client = mock.MagicMock(name='api_client')
80         api_client.nodes().create().execute.side_effect = \
81             [testutil.arvados_node_mock(1),
82              testutil.arvados_node_mock(2)]
83         self.arv_factory.return_value = api_client
84
85         self.cloud_factory = mock.MagicMock(name='cloud_mock')
86         self.cloud_factory().node_start_time.return_value = time.time()
87         self.cloud_updates = mock.MagicMock(name='updates_mock')
88         self.timer = testutil.MockTimer(deliver_immediately=False)
89         self.cloud_factory().node_id.side_effect = lambda node: node.id
90         self.cloud_factory().broken.return_value = False
91
92         self.node_setup = mock.MagicMock(name='setup_mock')
93         self.node_setup.start.side_effect = self.mock_node_start
94         self.node_setup.reset_mock()
95
96         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
97         self.node_shutdown.start.side_effect = self.mock_node_shutdown
98
99         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
100             self.server_wishlist_poller, self.arvados_nodes_poller,
101             self.cloud_nodes_poller, self.cloud_updates, self.timer,
102             self.arv_factory, self.cloud_factory,
103             shutdown_windows, ServerCalculator(avail_sizes),
104             min_nodes, max_nodes, 600, 1800, 3600,
105             self.node_setup, self.node_shutdown,
106             max_total_price=max_total_price).proxy()
107         if arvados_nodes is not None:
108             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
109         if cloud_nodes is not None:
110             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
111         if want_sizes is not None:
112             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
113
114     def monitor_list(self):
115         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
116
117     def monitored_arvados_nodes(self, include_unpaired=True):
118         pairings = []
119         for future in [actor.proxy().arvados_node
120                        for actor in self.monitor_list()]:
121             try:
122                 g = future.get(self.TIMEOUT)
123                 if g or include_unpaired:
124                     pairings.append(g)
125             except pykka.ActorDeadError:
126                 pass
127         return pairings
128
129     def alive_monitor_count(self):
130         return len(self.monitored_arvados_nodes())
131
132     def paired_monitor_count(self):
133         return len(self.monitored_arvados_nodes(False))
134
135     def assertShutdownCancellable(self, expected=True):
136         self.assertTrue(self.node_shutdown.start.called)
137         self.assertIs(expected,
138                       self.node_shutdown.start.call_args[1]['cancellable'],
139                       "ComputeNodeShutdownActor incorrectly cancellable")
140
141     def test_easy_node_creation(self):
142         size = testutil.MockSize(1)
143         self.make_daemon(want_sizes=[size])
144         self.busywait(lambda: self.node_setup.start.called)
145
146     def check_monitors_arvados_nodes(self, *arv_nodes):
147         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
148         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
149
150     def test_node_pairing(self):
151         cloud_node = testutil.cloud_node_mock(1)
152         arv_node = testutil.arvados_node_mock(1)
153         self.make_daemon([cloud_node], [arv_node])
154         self.check_monitors_arvados_nodes(arv_node)
155
156     def test_node_pairing_after_arvados_update(self):
157         cloud_node = testutil.cloud_node_mock(2)
158         self.make_daemon([cloud_node],
159                          [testutil.arvados_node_mock(1, ip_address=None)])
160         arv_node = testutil.arvados_node_mock(2)
161         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
162         self.check_monitors_arvados_nodes(arv_node)
163
164     def test_arvados_node_un_and_re_paired(self):
165         # We need to create the Arvados node mock after spinning up the daemon
166         # to make sure it's new enough to pair with the cloud node.
167         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
168                          arvados_nodes=None)
169         arv_node = testutil.arvados_node_mock(3)
170         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
171         self.check_monitors_arvados_nodes(arv_node)
172         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
173         self.busywait(lambda: 0 == self.alive_monitor_count())
174         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
175         self.check_monitors_arvados_nodes(arv_node)
176
177     def test_old_arvados_node_not_double_assigned(self):
178         arv_node = testutil.arvados_node_mock(3, age=9000)
179         size = testutil.MockSize(3)
180         self.make_daemon(arvados_nodes=[arv_node],
181                          avail_sizes=[(size, {"cores":1})])
182         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
183         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
184         self.stop_proxy(self.daemon)
185         used_nodes = [call[1].get('arvados_node')
186                       for call in self.node_setup.start.call_args_list]
187         self.assertEqual(2, len(used_nodes))
188         self.assertIn(arv_node, used_nodes)
189         self.assertIn(None, used_nodes)
190
191     def test_node_count_satisfied(self):
192         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
193                          want_sizes=[testutil.MockSize(1)])
194         self.busywait(lambda: not self.node_setup.start.called)
195
196     def test_dont_count_missing_as_busy(self):
197         size = testutil.MockSize(1)
198         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
199                                       testutil.cloud_node_mock(2, size=size)],
200                          arvados_nodes=[testutil.arvados_node_mock(1),
201                                         testutil.arvados_node_mock(
202                                             2,
203                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
204                          want_sizes=[size, size])
205         self.busywait(lambda: 2 == self.alive_monitor_count())
206         self.busywait(lambda: self.node_setup.start.called)
207
208     def test_missing_counts_towards_max(self):
209         size = testutil.MockSize(1)
210         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
211                                       testutil.cloud_node_mock(2, size=size)],
212                          arvados_nodes=[testutil.arvados_node_mock(1),
213                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
214                          want_sizes=[size, size],
215                          max_nodes=2)
216         self.busywait(lambda: not self.node_setup.start.called)
217
218     def test_excess_counts_missing(self):
219         size = testutil.MockSize(1)
220         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
221         self.make_daemon(cloud_nodes=cloud_nodes,
222                          arvados_nodes=[testutil.arvados_node_mock(1),
223                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
224                          want_sizes=[size])
225         self.busywait(lambda: 2 == self.paired_monitor_count())
226         for mon_ref in self.monitor_list():
227             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
228         self.assertEqual(1, self.node_shutdown.start.call_count)
229
230     def test_missing_shutdown_not_excess(self):
231         size = testutil.MockSize(1)
232         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
233         self.make_daemon(cloud_nodes=cloud_nodes,
234                          arvados_nodes=[testutil.arvados_node_mock(1),
235                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
236                          want_sizes=[size])
237         self.busywait(lambda: 2 == self.paired_monitor_count())
238         get_cloud_node = mock.MagicMock(name="get_cloud_node")
239         get_cloud_node.get.return_value = cloud_nodes[1]
240         mock_node_monitor = mock.MagicMock()
241         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
242         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
243
244         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
245
246         self.busywait(lambda: 2 == self.alive_monitor_count())
247         for mon_ref in self.monitor_list():
248             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
249         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
250
251     def test_booting_nodes_counted(self):
252         cloud_node = testutil.cloud_node_mock(1)
253         arv_node = testutil.arvados_node_mock(1)
254         server_wishlist = [testutil.MockSize(1)] * 2
255         self.make_daemon([cloud_node], [arv_node], server_wishlist)
256         self.daemon.max_nodes.get(self.TIMEOUT)
257         self.assertTrue(self.node_setup.start.called)
258         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
259         self.busywait(lambda: 1 == self.node_setup.start.call_count)
260
261     def test_boot_new_node_when_all_nodes_busy(self):
262         size = testutil.MockSize(2)
263         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
264         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
265                          [size], avail_sizes=[(size, {"cores":1})])
266         self.busywait(lambda: 1 == self.paired_monitor_count())
267         self.busywait(lambda: self.node_setup.start.called)
268
269     def test_boot_new_node_below_min_nodes(self):
270         min_size = testutil.MockSize(1)
271         wish_size = testutil.MockSize(3)
272         avail_sizes = [(min_size, {"cores": 1}),
273                        (wish_size, {"cores": 3})]
274         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
275         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
276         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
277         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
278         self.stop_proxy(self.daemon)
279         self.assertEqual([wish_size, min_size],
280                          [call[1].get('cloud_size')
281                           for call in self.node_setup.start.call_args_list])
282
283     def test_no_new_node_when_ge_min_nodes_busy(self):
284         size = testutil.MockSize(2)
285         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
286         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
287                      for n in range(1, 4)]
288         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
289         self.stop_proxy(self.daemon)
290         self.assertEqual(0, self.node_setup.start.call_count)
291
292     def test_no_new_node_when_max_nodes_busy(self):
293         size = testutil.MockSize(3)
294         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
295                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
296                          want_sizes=[size],
297                          max_nodes=1)
298         self.stop_proxy(self.daemon)
299         self.assertFalse(self.node_setup.start.called)
300
301     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
302         if cloud_node is None:
303             cloud_node = testutil.cloud_node_mock(id_num)
304         id_num = int(cloud_node.id)
305         if arv_node is None:
306             arv_node = testutil.arvados_node_mock(id_num)
307         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
308                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
309         self.daemon.max_nodes.get(self.TIMEOUT)
310         self.assertEqual(1, self.node_setup.start.call_count)
311         self.last_setup.cloud_node.get.return_value = cloud_node
312         self.last_setup.arvados_node.get.return_value = arv_node
313         return self.last_setup
314
315     def test_new_node_when_booted_node_not_usable(self):
316         cloud_node = testutil.cloud_node_mock(4)
317         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
318         setup = self.start_node_boot(cloud_node, arv_node)
319         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
320         self.assertEqual(1, self.alive_monitor_count())
321         self.daemon.update_arvados_nodes([arv_node])
322         self.daemon.update_cloud_nodes([cloud_node])
323         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
324         self.daemon.update_server_wishlist(
325             [testutil.MockSize(4)]).get(self.TIMEOUT)
326         self.stop_proxy(self.daemon)
327         self.assertEqual(2, self.node_setup.start.call_count)
328
329     def test_no_duplication_when_booting_node_listed_fast(self):
330         # Test that we don't start two ComputeNodeMonitorActors when
331         # we learn about a booting node through a listing before we
332         # get the "node up" message from CloudNodeSetupActor.
333         cloud_node = testutil.cloud_node_mock(1)
334         setup = self.start_node_boot(cloud_node)
335         self.daemon.update_cloud_nodes([cloud_node])
336         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
337         self.assertEqual(1, self.alive_monitor_count())
338
339     def test_no_duplication_when_booted_node_listed(self):
340         cloud_node = testutil.cloud_node_mock(2)
341         setup = self.start_node_boot(cloud_node, id_num=2)
342         self.daemon.node_setup_finished(setup)
343         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
344         self.assertEqual(1, self.alive_monitor_count())
345
346     def test_node_counted_after_boot_with_slow_listing(self):
347         # Test that, after we boot a compute node, we assume it exists
348         # even it doesn't appear in the listing (e.g., because of delays
349         # propagating tags).
350         setup = self.start_node_boot()
351         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
352         self.assertEqual(1, self.alive_monitor_count())
353         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
354         self.assertEqual(1, self.alive_monitor_count())
355
356     def test_booted_unlisted_node_counted(self):
357         setup = self.start_node_boot(id_num=1)
358         self.daemon.node_setup_finished(setup)
359         self.daemon.update_server_wishlist(
360             [testutil.MockSize(1)]).get(self.TIMEOUT)
361         self.stop_proxy(self.daemon)
362         self.assertEqual(1, self.node_setup.start.call_count)
363
364     def test_booted_node_can_shutdown(self):
365         setup = self.start_node_boot()
366         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
367         self.assertEqual(1, self.alive_monitor_count())
368         monitor = self.monitor_list()[0].proxy()
369         self.daemon.update_server_wishlist([])
370         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
371         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
372         self.stop_proxy(self.daemon)
373         self.assertTrue(self.node_shutdown.start.called,
374                         "daemon did not shut down booted node on offer")
375
376         with test_status.TestServer() as srv:
377             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
378             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
379             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
380
381     def test_booted_node_lifecycle(self):
382         cloud_node = testutil.cloud_node_mock(6)
383         setup = self.start_node_boot(cloud_node, id_num=6)
384         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
385         self.assertEqual(1, self.alive_monitor_count())
386         monitor = self.monitor_list()[0].proxy()
387         self.daemon.update_server_wishlist([])
388         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
389         self.assertShutdownCancellable(True)
390         shutdown = self.node_shutdown.start().proxy()
391         shutdown.cloud_node.get.return_value = cloud_node
392         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
393         self.daemon.update_cloud_nodes([])
394         self.assertTrue(shutdown.stop.called,
395                         "shutdown actor not stopped after finishing")
396         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
397                         "monitor for booted node not stopped after shutdown")
398         self.daemon.update_server_wishlist(
399             [testutil.MockSize(2)]).get(self.TIMEOUT)
400         self.stop_proxy(self.daemon)
401         self.assertTrue(self.node_setup.start.called,
402                         "second node not started after booted node stopped")
403
404     def test_node_disappearing_during_shutdown(self):
405         cloud_node = testutil.cloud_node_mock(6)
406         setup = self.start_node_boot(cloud_node, id_num=6)
407         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
408         self.assertEqual(1, self.alive_monitor_count())
409         monitor = self.monitor_list()[0].proxy()
410         self.daemon.update_server_wishlist([])
411         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
412         self.assertShutdownCancellable(True)
413         shutdown = self.node_shutdown.start().proxy()
414         shutdown.cloud_node.get.return_value = cloud_node
415         # Simulate a successful but slow node destroy call: the cloud node
416         # list gets updated before the ShutdownActor finishes.
417         record = self.daemon.cloud_nodes.get().nodes.values()[0]
418         self.assertTrue(record.shutdown_actor is not None)
419         self.daemon.cloud_nodes.get().nodes.clear()
420         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
421         self.assertTrue(
422             record.shutdown_actor is not None,
423             "test was ineffective -- failed to simulate the race condition")
424
425     def test_booted_node_shut_down_when_never_listed(self):
426         setup = self.start_node_boot()
427         self.cloud_factory().node_start_time.return_value = time.time() - 3601
428         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
429         self.assertEqual(1, self.alive_monitor_count())
430         self.assertFalse(self.node_shutdown.start.called)
431         now = time.time()
432         self.monitor_list()[0].tell_proxy().consider_shutdown()
433         self.busywait(lambda: self.node_shutdown.start.called)
434         self.assertShutdownCancellable(False)
435
436     def test_booted_node_shut_down_when_never_paired(self):
437         cloud_node = testutil.cloud_node_mock(2)
438         setup = self.start_node_boot(cloud_node)
439         self.cloud_factory().node_start_time.return_value = time.time() - 3601
440         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
441         self.assertEqual(1, self.alive_monitor_count())
442         self.daemon.update_cloud_nodes([cloud_node])
443         self.monitor_list()[0].tell_proxy().consider_shutdown()
444         self.busywait(lambda: self.node_shutdown.start.called)
445         self.assertShutdownCancellable(False)
446
447     def test_booted_node_shut_down_when_never_working(self):
448         cloud_node = testutil.cloud_node_mock(4)
449         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
450         setup = self.start_node_boot(cloud_node, arv_node)
451         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
452         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
453         self.assertEqual(1, self.alive_monitor_count())
454         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
455         self.daemon.update_cloud_nodes([cloud_node])
456         self.busywait(lambda: self.node_shutdown.start.called)
457         self.assertShutdownCancellable(False)
458
459     def test_node_that_pairs_not_considered_failed_boot(self):
460         cloud_node = testutil.cloud_node_mock(3)
461         arv_node = testutil.arvados_node_mock(3)
462         setup = self.start_node_boot(cloud_node, arv_node)
463         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
464         self.assertEqual(1, self.alive_monitor_count())
465         self.daemon.update_cloud_nodes([cloud_node])
466         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
467         self.timer.deliver()
468         self.stop_proxy(self.daemon)
469         self.assertFalse(self.node_shutdown.start.called)
470
471     def test_node_that_pairs_busy_not_considered_failed_boot(self):
472         cloud_node = testutil.cloud_node_mock(5)
473         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
474         setup = self.start_node_boot(cloud_node, arv_node)
475         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
476         self.assertEqual(1, self.alive_monitor_count())
477         self.daemon.update_cloud_nodes([cloud_node])
478         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
479         self.timer.deliver()
480         self.stop_proxy(self.daemon)
481         self.assertFalse(self.node_shutdown.start.called)
482
483     def test_booting_nodes_shut_down(self):
484         self.make_daemon(want_sizes=[testutil.MockSize(1)])
485         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
486         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
487
488     def test_all_booting_nodes_tried_to_shut_down(self):
489         size = testutil.MockSize(2)
490         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
491         self.daemon.max_nodes.get(self.TIMEOUT)
492         setup1 = self.last_setup
493         setup1.stop_if_no_cloud_node().get.return_value = False
494         setup1.stop_if_no_cloud_node.reset_mock()
495         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
496         self.daemon.max_nodes.get(self.TIMEOUT)
497         self.assertIsNot(setup1, self.last_setup)
498         self.last_setup.stop_if_no_cloud_node().get.return_value = True
499         self.last_setup.stop_if_no_cloud_node.reset_mock()
500         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
501         self.daemon.max_nodes.get(self.TIMEOUT)
502         self.stop_proxy(self.daemon)
503         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
504         self.assertTrue(setup1.stop_if_no_cloud_node.called)
505
506     def test_shutdown_declined_at_wishlist_capacity(self):
507         cloud_node = testutil.cloud_node_mock(1)
508         arv_node = testutil.arvados_node_mock(1)
509         size = testutil.MockSize(1)
510         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
511         self.busywait(lambda: 1 == self.paired_monitor_count())
512         monitor = self.monitor_list()[0].proxy()
513         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
514         self.stop_proxy(self.daemon)
515         self.assertFalse(self.node_shutdown.start.called)
516
517     def test_shutdown_declined_below_min_nodes(self):
518         cloud_node = testutil.cloud_node_mock(1)
519         arv_node = testutil.arvados_node_mock(1)
520         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
521         self.busywait(lambda: 1 == self.paired_monitor_count())
522         monitor = self.monitor_list()[0].proxy()
523         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
524         self.stop_proxy(self.daemon)
525         self.assertFalse(self.node_shutdown.start.called)
526
527     def test_shutdown_accepted_below_capacity(self):
528         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
529         self.busywait(lambda: 1 == self.alive_monitor_count())
530         monitor = self.monitor_list()[0].proxy()
531         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
532         self.busywait(lambda: self.node_shutdown.start.called)
533
534     def test_shutdown_declined_when_idle_and_job_queued(self):
535         size = testutil.MockSize(1)
536         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
537         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
538                      testutil.arvados_node_mock(4, job_uuid=None)]
539         self.make_daemon(cloud_nodes, arv_nodes, [size])
540         self.busywait(lambda: 2 == self.paired_monitor_count())
541         for mon_ref in self.monitor_list():
542             monitor = mon_ref.proxy()
543             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
544                 break
545         else:
546             self.fail("monitor for idle node not found")
547         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
548         self.stop_proxy(self.daemon)
549         self.assertFalse(self.node_shutdown.start.called)
550
551     def test_node_shutdown_after_cancelled_shutdown(self):
552         cloud_node = testutil.cloud_node_mock(5)
553         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
554         self.assertEqual(1, self.alive_monitor_count())
555         monitor = self.monitor_list()[0].proxy()
556         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
557         self.last_shutdown.success.get.return_value = False
558         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
559         self.busywait(lambda: 1 == self.paired_monitor_count())
560
561         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
562         self.last_shutdown.success.get.return_value = True
563         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
564         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
565         self.busywait(lambda: 0 == self.paired_monitor_count())
566
567     def test_nodes_shutting_down_replaced_below_max_nodes(self):
568         size = testutil.MockSize(6)
569         cloud_node = testutil.cloud_node_mock(6, size=size)
570         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
571                          avail_sizes=[(size, {"cores":1})])
572         self.assertEqual(1, self.alive_monitor_count())
573         monitor = self.monitor_list()[0].proxy()
574         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
575         self.assertTrue(self.node_shutdown.start.called)
576         self.daemon.update_server_wishlist(
577             [testutil.MockSize(6)]).get(self.TIMEOUT)
578         self.busywait(lambda: self.node_setup.start.called)
579
580     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
581         cloud_node = testutil.cloud_node_mock(7)
582         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
583                          max_nodes=1)
584         self.busywait(lambda: 1 == self.paired_monitor_count())
585         monitor = self.monitor_list()[0].proxy()
586         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
587         self.assertTrue(self.node_shutdown.start.called)
588         self.daemon.update_server_wishlist(
589             [testutil.MockSize(7)]).get(self.TIMEOUT)
590         self.busywait(lambda: not self.node_setup.start.called)
591
592     def test_nodes_shutting_down_count_against_excess(self):
593         size = testutil.MockSize(8)
594         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
595         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
596         self.make_daemon(cloud_nodes, arv_nodes, [size],
597                          avail_sizes=[(size, {"cores":1})])
598         self.busywait(lambda: 2 == self.paired_monitor_count())
599         for mon_ref in self.monitor_list():
600             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
601         self.assertEqual(1, self.node_shutdown.start.call_count)
602
603     def test_clean_shutdown_waits_for_node_setup_finish(self):
604         new_node = self.start_node_boot()
605         new_node.stop_if_no_cloud_node().get.return_value = False
606         new_node.stop_if_no_cloud_node.reset_mock()
607         self.daemon.shutdown().get(self.TIMEOUT)
608         self.assertTrue(new_node.stop_if_no_cloud_node.called)
609         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
610         self.assertTrue(new_node.stop.called)
611         self.timer.deliver()
612         self.assertTrue(
613             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
614
615     def test_wishlist_ignored_after_shutdown(self):
616         new_node = self.start_node_boot()
617         new_node.stop_if_no_cloud_node().get.return_value = False
618         new_node.stop_if_no_cloud_node.reset_mock()
619         self.daemon.shutdown().get(self.TIMEOUT)
620         size = testutil.MockSize(2)
621         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
622         self.timer.deliver()
623         self.busywait(lambda: 1 == self.node_setup.start.call_count)
624
625     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
626         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
627         self.assertEqual(1, self.alive_monitor_count())
628         monitor = self.monitor_list()[0].proxy()
629         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
630         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
631         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
632
633     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
634         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
635         self.assertEqual(1, self.alive_monitor_count())
636         monitor = self.monitor_list()[0].proxy()
637         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
638         # We're mainly testing that update_cloud_nodes catches and handles
639         # the ActorDeadError.
640         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
641         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
642         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
643
644     def test_node_create_two_sizes(self):
645         small = testutil.MockSize(1)
646         big = testutil.MockSize(2)
647         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
648                         (testutil.MockSize(2), {"cores":2})]
649         self.make_daemon(want_sizes=[small, small, small, big],
650                          avail_sizes=avail_sizes, max_nodes=4)
651
652         # the daemon runs in another thread, so we need to wait and see
653         # if it does all the work we're expecting it to do before stopping it.
654         self.busywait(lambda: self.node_setup.start.call_count == 4)
655         booting = self.daemon.booting.get(self.TIMEOUT)
656         self.stop_proxy(self.daemon)
657         sizecounts = {a[0].id: 0 for a in avail_sizes}
658         for b in booting.itervalues():
659             sizecounts[b.cloud_size.get().id] += 1
660         logging.info(sizecounts)
661         self.assertEqual(3, sizecounts[small.id])
662         self.assertEqual(1, sizecounts[big.id])
663
664     def test_node_max_nodes_two_sizes(self):
665         small = testutil.MockSize(1)
666         big = testutil.MockSize(2)
667         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
668                         (testutil.MockSize(2), {"cores":2})]
669         self.make_daemon(want_sizes=[small, small, small, big],
670                          avail_sizes=avail_sizes, max_nodes=3)
671
672         # the daemon runs in another thread, so we need to wait and see
673         # if it does all the work we're expecting it to do before stopping it.
674         self.busywait(lambda: self.node_setup.start.call_count == 3)
675         booting = self.daemon.booting.get(self.TIMEOUT)
676         self.stop_proxy(self.daemon)
677         sizecounts = {a[0].id: 0 for a in avail_sizes}
678         for b in booting.itervalues():
679             sizecounts[b.cloud_size.get().id] += 1
680         self.assertEqual(2, sizecounts[small.id])
681         self.assertEqual(1, sizecounts[big.id])
682
683     def test_wishlist_reconfigure(self):
684         small = testutil.MockSize(1)
685         big = testutil.MockSize(2)
686         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
687
688         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
689                                       testutil.cloud_node_mock(2, small),
690                                       testutil.cloud_node_mock(3, big)],
691                          arvados_nodes=[testutil.arvados_node_mock(1),
692                                         testutil.arvados_node_mock(2),
693                                         testutil.arvados_node_mock(3)],
694                          want_sizes=[small, small, big],
695                          avail_sizes=avail_sizes)
696         self.busywait(lambda: 3 == self.paired_monitor_count())
697         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
698
699         self.assertEqual(0, self.node_shutdown.start.call_count)
700
701         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
702             self.daemon.node_can_shutdown(c.actor)
703
704         booting = self.daemon.booting.get()
705         cloud_nodes = self.daemon.cloud_nodes.get()
706
707         self.busywait(lambda: 1 == self.node_setup.start.call_count)
708         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
709
710         self.stop_proxy(self.daemon)
711
712         # booting a new big node
713         sizecounts = {a[0].id: 0 for a in avail_sizes}
714         for b in booting.itervalues():
715             sizecounts[b.cloud_size.get().id] += 1
716         self.assertEqual(0, sizecounts[small.id])
717         self.assertEqual(1, sizecounts[big.id])
718
719         # shutting down a small node
720         sizecounts = {a[0].id: 0 for a in avail_sizes}
721         for b in cloud_nodes.nodes.itervalues():
722             if b.shutdown_actor is not None:
723                 sizecounts[b.cloud_node.size.id] += 1
724         self.assertEqual(1, sizecounts[small.id])
725         self.assertEqual(0, sizecounts[big.id])
726
727     def test_node_max_price(self):
728         small = testutil.MockSize(1)
729         big = testutil.MockSize(2)
730         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
731                         (testutil.MockSize(2), {"cores":2, "price":2})]
732         self.make_daemon(want_sizes=[small, small, small, big],
733                          avail_sizes=avail_sizes,
734                          max_nodes=4,
735                          max_total_price=4)
736         # the daemon runs in another thread, so we need to wait and see
737         # if it does all the work we're expecting it to do before stopping it.
738         self.busywait(lambda: self.node_setup.start.call_count == 3)
739         booting = self.daemon.booting.get()
740         self.stop_proxy(self.daemon)
741
742         sizecounts = {a[0].id: 0 for a in avail_sizes}
743         for b in booting.itervalues():
744             sizecounts[b.cloud_size.get().id] += 1
745         logging.info(sizecounts)
746
747         # Booting 3 small nodes and not booting a big node would also partially
748         # satisfy the wishlist and come in under the price cap, however the way
749         # the update_server_wishlist() currently works effectively results in a
750         # round-robin creation of one node of each size in the wishlist, so
751         # test for that.
752         self.assertEqual(2, sizecounts[small.id])
753         self.assertEqual(1, sizecounts[big.id])