Merge branch '12100-cwltool-update' closes #12100
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24
25     def busywait(self, f):
26         n = 0
27         while not f() and n < 200:
28             time.sleep(.1)
29             self.daemon.ping().get(self.TIMEOUT)
30             n += 1
31         self.assertTrue(f())
32
33     def mock_node_start(self, **kwargs):
34         # Make sure that every time the daemon starts a setup actor,
35         # it gets a new mock object back.
36         get_cloud_size = mock.MagicMock()
37         get_cloud_size.get.return_value = kwargs["cloud_size"]
38         mock_actor = mock.MagicMock()
39         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
40                                           cloud_size=get_cloud_size,
41                                           actor_ref=mock_actor)
42         mock_actor.proxy.return_value = mock_proxy
43         mock_actor.tell_proxy.return_value = mock_proxy
44
45         self.last_setup = mock_proxy
46         return mock_actor
47
48     def mock_node_shutdown(self, **kwargs):
49         # Make sure that every time the daemon starts a shutdown actor,
50         # it gets a new mock object back.
51         get_cloud_node = mock.MagicMock()
52         if "node_monitor" in kwargs:
53             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
54         mock_actor = mock.MagicMock()
55         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
56                                           cloud_node=get_cloud_node,
57                                           actor_ref=mock_actor)
58
59         mock_actor.proxy.return_value = mock_proxy
60         self.last_shutdown = mock_proxy
61
62         return mock_actor
63
64     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
65                     avail_sizes=None,
66                     min_nodes=0, max_nodes=8,
67                     shutdown_windows=[54, 5, 1],
68                     max_total_price=None):
69         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
70             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
71
72         if not avail_sizes:
73             if cloud_nodes or want_sizes:
74                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
75             else:
76                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
77
78         self.arv_factory = mock.MagicMock(name='arvados_mock')
79         api_client = mock.MagicMock(name='api_client')
80         api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
81                                                            testutil.arvados_node_mock(2)]
82         self.arv_factory.return_value = api_client
83
84         self.cloud_factory = mock.MagicMock(name='cloud_mock')
85         self.cloud_factory().node_start_time.return_value = time.time()
86         self.cloud_updates = mock.MagicMock(name='updates_mock')
87         self.timer = testutil.MockTimer(deliver_immediately=False)
88         self.cloud_factory().node_id.side_effect = lambda node: node.id
89         self.cloud_factory().broken.return_value = False
90
91         self.node_setup = mock.MagicMock(name='setup_mock')
92         self.node_setup.start.side_effect = self.mock_node_start
93         self.node_setup.reset_mock()
94
95         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
96         self.node_shutdown.start.side_effect = self.mock_node_shutdown
97
98         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
99             self.server_wishlist_poller, self.arvados_nodes_poller,
100             self.cloud_nodes_poller, self.cloud_updates, self.timer,
101             self.arv_factory, self.cloud_factory,
102             shutdown_windows, ServerCalculator(avail_sizes),
103             min_nodes, max_nodes, 600, 1800, 3600,
104             self.node_setup, self.node_shutdown,
105             max_total_price=max_total_price).proxy()
106         if arvados_nodes is not None:
107             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
108         if cloud_nodes is not None:
109             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
110         if want_sizes is not None:
111             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
112
113     def monitor_list(self):
114         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
115
116     def monitored_arvados_nodes(self, include_unpaired=True):
117         pairings = []
118         for future in [actor.proxy().arvados_node
119                        for actor in self.monitor_list()]:
120             try:
121                 g = future.get(self.TIMEOUT)
122                 if g or include_unpaired:
123                     pairings.append(g)
124             except pykka.ActorDeadError:
125                 pass
126         return pairings
127
128     def alive_monitor_count(self):
129         return len(self.monitored_arvados_nodes())
130
131     def paired_monitor_count(self):
132         return len(self.monitored_arvados_nodes(False))
133
134     def assertShutdownCancellable(self, expected=True):
135         self.assertTrue(self.node_shutdown.start.called)
136         self.assertIs(expected,
137                       self.node_shutdown.start.call_args[1]['cancellable'],
138                       "ComputeNodeShutdownActor incorrectly cancellable")
139
140     def test_easy_node_creation(self):
141         size = testutil.MockSize(1)
142         self.make_daemon(want_sizes=[size])
143         self.busywait(lambda: self.node_setup.start.called)
144
145     def check_monitors_arvados_nodes(self, *arv_nodes):
146         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
147         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
148
149     def test_node_pairing(self):
150         cloud_node = testutil.cloud_node_mock(1)
151         arv_node = testutil.arvados_node_mock(1)
152         self.make_daemon([cloud_node], [arv_node])
153         self.check_monitors_arvados_nodes(arv_node)
154
155     def test_node_pairing_after_arvados_update(self):
156         cloud_node = testutil.cloud_node_mock(2)
157         self.make_daemon([cloud_node],
158                          [testutil.arvados_node_mock(1, ip_address=None)])
159         arv_node = testutil.arvados_node_mock(2)
160         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
161         self.check_monitors_arvados_nodes(arv_node)
162
163     def test_arvados_node_un_and_re_paired(self):
164         # We need to create the Arvados node mock after spinning up the daemon
165         # to make sure it's new enough to pair with the cloud node.
166         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
167                          arvados_nodes=None)
168         arv_node = testutil.arvados_node_mock(3)
169         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
170         self.check_monitors_arvados_nodes(arv_node)
171         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
172         self.busywait(lambda: 0 == self.alive_monitor_count())
173         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
174         self.check_monitors_arvados_nodes(arv_node)
175
176     def test_old_arvados_node_not_double_assigned(self):
177         arv_node = testutil.arvados_node_mock(3, age=9000)
178         size = testutil.MockSize(3)
179         self.make_daemon(arvados_nodes=[arv_node],
180                          avail_sizes=[(size, {"cores":1})])
181         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
182         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
183         self.stop_proxy(self.daemon)
184         used_nodes = [call[1].get('arvados_node')
185                       for call in self.node_setup.start.call_args_list]
186         self.assertEqual(2, len(used_nodes))
187         self.assertIn(arv_node, used_nodes)
188         self.assertIn(None, used_nodes)
189
190     def test_node_count_satisfied(self):
191         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
192                          want_sizes=[testutil.MockSize(1)])
193         self.busywait(lambda: not self.node_setup.start.called)
194
195     def test_dont_count_missing_as_busy(self):
196         size = testutil.MockSize(1)
197         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
198                                       testutil.cloud_node_mock(2, size=size)],
199                          arvados_nodes=[testutil.arvados_node_mock(1),
200                                         testutil.arvados_node_mock(
201                                             2,
202                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
203                          want_sizes=[size, size])
204         self.busywait(lambda: 2 == self.alive_monitor_count())
205         self.busywait(lambda: self.node_setup.start.called)
206
207     def test_missing_counts_towards_max(self):
208         size = testutil.MockSize(1)
209         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
210                                       testutil.cloud_node_mock(2, size=size)],
211                          arvados_nodes=[testutil.arvados_node_mock(1),
212                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
213                          want_sizes=[size, size],
214                          max_nodes=2)
215         self.busywait(lambda: not self.node_setup.start.called)
216
217     def test_excess_counts_missing(self):
218         size = testutil.MockSize(1)
219         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
220         self.make_daemon(cloud_nodes=cloud_nodes,
221                          arvados_nodes=[testutil.arvados_node_mock(1),
222                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
223                          want_sizes=[size])
224         self.busywait(lambda: 2 == self.paired_monitor_count())
225         for mon_ref in self.monitor_list():
226             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
227         self.assertEqual(1, self.node_shutdown.start.call_count)
228
229     def test_missing_shutdown_not_excess(self):
230         size = testutil.MockSize(1)
231         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
232         self.make_daemon(cloud_nodes=cloud_nodes,
233                          arvados_nodes=[testutil.arvados_node_mock(1),
234                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
235                          want_sizes=[size])
236         self.busywait(lambda: 2 == self.paired_monitor_count())
237         get_cloud_node = mock.MagicMock(name="get_cloud_node")
238         get_cloud_node.get.return_value = cloud_nodes[1]
239         mock_node_monitor = mock.MagicMock()
240         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
241         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
242
243         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
244
245         self.busywait(lambda: 2 == self.alive_monitor_count())
246         for mon_ref in self.monitor_list():
247             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
248         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
249
250     def test_booting_nodes_counted(self):
251         cloud_node = testutil.cloud_node_mock(1)
252         arv_node = testutil.arvados_node_mock(1)
253         server_wishlist = [testutil.MockSize(1)] * 2
254         self.make_daemon([cloud_node], [arv_node], server_wishlist)
255         self.daemon.max_nodes.get(self.TIMEOUT)
256         self.assertTrue(self.node_setup.start.called)
257         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
258         self.busywait(lambda: 1 == self.node_setup.start.call_count)
259
260     def test_boot_new_node_when_all_nodes_busy(self):
261         size = testutil.MockSize(2)
262         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
263         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
264                          [size], avail_sizes=[(size, {"cores":1})])
265         self.busywait(lambda: 1 == self.paired_monitor_count())
266         self.busywait(lambda: self.node_setup.start.called)
267
268     def test_boot_new_node_below_min_nodes(self):
269         min_size = testutil.MockSize(1)
270         wish_size = testutil.MockSize(3)
271         avail_sizes = [(min_size, {"cores": 1}),
272                        (wish_size, {"cores": 3})]
273         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
274         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
275         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
276         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
277         self.stop_proxy(self.daemon)
278         self.assertEqual([wish_size, min_size],
279                          [call[1].get('cloud_size')
280                           for call in self.node_setup.start.call_args_list])
281
282     def test_no_new_node_when_ge_min_nodes_busy(self):
283         size = testutil.MockSize(2)
284         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
285         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
286                      for n in range(1, 4)]
287         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
288         self.stop_proxy(self.daemon)
289         self.assertEqual(0, self.node_setup.start.call_count)
290
291     def test_no_new_node_when_max_nodes_busy(self):
292         size = testutil.MockSize(3)
293         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
294                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
295                          want_sizes=[size],
296                          max_nodes=1)
297         self.stop_proxy(self.daemon)
298         self.assertFalse(self.node_setup.start.called)
299
300     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
301         if cloud_node is None:
302             cloud_node = testutil.cloud_node_mock(id_num)
303         id_num = int(cloud_node.id)
304         if arv_node is None:
305             arv_node = testutil.arvados_node_mock(id_num)
306         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
307                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
308         self.daemon.max_nodes.get(self.TIMEOUT)
309         self.assertEqual(1, self.node_setup.start.call_count)
310         self.last_setup.cloud_node.get.return_value = cloud_node
311         self.last_setup.arvados_node.get.return_value = arv_node
312         return self.last_setup
313
314     def test_new_node_when_booted_node_not_usable(self):
315         cloud_node = testutil.cloud_node_mock(4)
316         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
317         setup = self.start_node_boot(cloud_node, arv_node)
318         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
319         self.assertEqual(1, self.alive_monitor_count())
320         self.daemon.update_arvados_nodes([arv_node])
321         self.daemon.update_cloud_nodes([cloud_node])
322         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
323         self.daemon.update_server_wishlist(
324             [testutil.MockSize(4)]).get(self.TIMEOUT)
325         self.stop_proxy(self.daemon)
326         self.assertEqual(2, self.node_setup.start.call_count)
327
328     def test_no_duplication_when_booting_node_listed_fast(self):
329         # Test that we don't start two ComputeNodeMonitorActors when
330         # we learn about a booting node through a listing before we
331         # get the "node up" message from CloudNodeSetupActor.
332         cloud_node = testutil.cloud_node_mock(1)
333         setup = self.start_node_boot(cloud_node)
334         self.daemon.update_cloud_nodes([cloud_node])
335         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
336         self.assertEqual(1, self.alive_monitor_count())
337
338     def test_no_duplication_when_booted_node_listed(self):
339         cloud_node = testutil.cloud_node_mock(2)
340         setup = self.start_node_boot(cloud_node, id_num=2)
341         self.daemon.node_setup_finished(setup)
342         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
343         self.assertEqual(1, self.alive_monitor_count())
344
345     def test_node_counted_after_boot_with_slow_listing(self):
346         # Test that, after we boot a compute node, we assume it exists
347         # even it doesn't appear in the listing (e.g., because of delays
348         # propagating tags).
349         setup = self.start_node_boot()
350         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
351         self.assertEqual(1, self.alive_monitor_count())
352         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
353         self.assertEqual(1, self.alive_monitor_count())
354
355     def test_booted_unlisted_node_counted(self):
356         setup = self.start_node_boot(id_num=1)
357         self.daemon.node_setup_finished(setup)
358         self.daemon.update_server_wishlist(
359             [testutil.MockSize(1)]).get(self.TIMEOUT)
360         self.stop_proxy(self.daemon)
361         self.assertEqual(1, self.node_setup.start.call_count)
362
363     def test_booted_node_can_shutdown(self):
364         setup = self.start_node_boot()
365         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
366         self.assertEqual(1, self.alive_monitor_count())
367         monitor = self.monitor_list()[0].proxy()
368         self.daemon.update_server_wishlist([])
369         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
370         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
371         self.stop_proxy(self.daemon)
372         self.assertTrue(self.node_shutdown.start.called,
373                         "daemon did not shut down booted node on offer")
374
375         with test_status.TestServer() as srv:
376             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
377             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
378             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
379
380     def test_booted_node_lifecycle(self):
381         cloud_node = testutil.cloud_node_mock(6)
382         setup = self.start_node_boot(cloud_node, id_num=6)
383         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
384         self.assertEqual(1, self.alive_monitor_count())
385         monitor = self.monitor_list()[0].proxy()
386         self.daemon.update_server_wishlist([])
387         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
388         self.assertShutdownCancellable(True)
389         shutdown = self.node_shutdown.start().proxy()
390         shutdown.cloud_node.get.return_value = cloud_node
391         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
392         self.daemon.update_cloud_nodes([])
393         self.assertTrue(shutdown.stop.called,
394                         "shutdown actor not stopped after finishing")
395         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
396                         "monitor for booted node not stopped after shutdown")
397         self.daemon.update_server_wishlist(
398             [testutil.MockSize(2)]).get(self.TIMEOUT)
399         self.stop_proxy(self.daemon)
400         self.assertTrue(self.node_setup.start.called,
401                         "second node not started after booted node stopped")
402
403     def test_booted_node_shut_down_when_never_listed(self):
404         setup = self.start_node_boot()
405         self.cloud_factory().node_start_time.return_value = time.time() - 3601
406         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
407         self.assertEqual(1, self.alive_monitor_count())
408         self.assertFalse(self.node_shutdown.start.called)
409         now = time.time()
410         self.monitor_list()[0].tell_proxy().consider_shutdown()
411         self.busywait(lambda: self.node_shutdown.start.called)
412         self.assertShutdownCancellable(False)
413
414     def test_booted_node_shut_down_when_never_paired(self):
415         cloud_node = testutil.cloud_node_mock(2)
416         setup = self.start_node_boot(cloud_node)
417         self.cloud_factory().node_start_time.return_value = time.time() - 3601
418         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
419         self.assertEqual(1, self.alive_monitor_count())
420         self.daemon.update_cloud_nodes([cloud_node])
421         self.monitor_list()[0].tell_proxy().consider_shutdown()
422         self.busywait(lambda: self.node_shutdown.start.called)
423         self.assertShutdownCancellable(False)
424
425     def test_booted_node_shut_down_when_never_working(self):
426         cloud_node = testutil.cloud_node_mock(4)
427         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
428         setup = self.start_node_boot(cloud_node, arv_node)
429         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
430         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
431         self.assertEqual(1, self.alive_monitor_count())
432         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
433         self.daemon.update_cloud_nodes([cloud_node])
434         self.busywait(lambda: self.node_shutdown.start.called)
435         self.assertShutdownCancellable(False)
436
437     def test_node_that_pairs_not_considered_failed_boot(self):
438         cloud_node = testutil.cloud_node_mock(3)
439         arv_node = testutil.arvados_node_mock(3)
440         setup = self.start_node_boot(cloud_node, arv_node)
441         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
442         self.assertEqual(1, self.alive_monitor_count())
443         self.daemon.update_cloud_nodes([cloud_node])
444         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
445         self.timer.deliver()
446         self.stop_proxy(self.daemon)
447         self.assertFalse(self.node_shutdown.start.called)
448
449     def test_node_that_pairs_busy_not_considered_failed_boot(self):
450         cloud_node = testutil.cloud_node_mock(5)
451         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
452         setup = self.start_node_boot(cloud_node, arv_node)
453         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
454         self.assertEqual(1, self.alive_monitor_count())
455         self.daemon.update_cloud_nodes([cloud_node])
456         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
457         self.timer.deliver()
458         self.stop_proxy(self.daemon)
459         self.assertFalse(self.node_shutdown.start.called)
460
461     def test_booting_nodes_shut_down(self):
462         self.make_daemon(want_sizes=[testutil.MockSize(1)])
463         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
464         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
465
466     def test_all_booting_nodes_tried_to_shut_down(self):
467         size = testutil.MockSize(2)
468         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
469         self.daemon.max_nodes.get(self.TIMEOUT)
470         setup1 = self.last_setup
471         setup1.stop_if_no_cloud_node().get.return_value = False
472         setup1.stop_if_no_cloud_node.reset_mock()
473         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
474         self.daemon.max_nodes.get(self.TIMEOUT)
475         self.assertIsNot(setup1, self.last_setup)
476         self.last_setup.stop_if_no_cloud_node().get.return_value = True
477         self.last_setup.stop_if_no_cloud_node.reset_mock()
478         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
479         self.daemon.max_nodes.get(self.TIMEOUT)
480         self.stop_proxy(self.daemon)
481         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
482         self.assertTrue(setup1.stop_if_no_cloud_node.called)
483
484     def test_shutdown_declined_at_wishlist_capacity(self):
485         cloud_node = testutil.cloud_node_mock(1)
486         arv_node = testutil.arvados_node_mock(1)
487         size = testutil.MockSize(1)
488         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
489         self.busywait(lambda: 1 == self.paired_monitor_count())
490         monitor = self.monitor_list()[0].proxy()
491         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
492         self.stop_proxy(self.daemon)
493         self.assertFalse(self.node_shutdown.start.called)
494
495     def test_shutdown_declined_below_min_nodes(self):
496         cloud_node = testutil.cloud_node_mock(1)
497         arv_node = testutil.arvados_node_mock(1)
498         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
499         self.busywait(lambda: 1 == self.paired_monitor_count())
500         monitor = self.monitor_list()[0].proxy()
501         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
502         self.stop_proxy(self.daemon)
503         self.assertFalse(self.node_shutdown.start.called)
504
505     def test_shutdown_accepted_below_capacity(self):
506         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
507         self.busywait(lambda: 1 == self.alive_monitor_count())
508         monitor = self.monitor_list()[0].proxy()
509         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
510         self.busywait(lambda: self.node_shutdown.start.called)
511
512     def test_shutdown_declined_when_idle_and_job_queued(self):
513         size = testutil.MockSize(1)
514         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
515         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
516                      testutil.arvados_node_mock(4, job_uuid=None)]
517         self.make_daemon(cloud_nodes, arv_nodes, [size])
518         self.busywait(lambda: 2 == self.paired_monitor_count())
519         for mon_ref in self.monitor_list():
520             monitor = mon_ref.proxy()
521             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
522                 break
523         else:
524             self.fail("monitor for idle node not found")
525         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
526         self.stop_proxy(self.daemon)
527         self.assertFalse(self.node_shutdown.start.called)
528
529     def test_node_shutdown_after_cancelled_shutdown(self):
530         cloud_node = testutil.cloud_node_mock(5)
531         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
532         self.assertEqual(1, self.alive_monitor_count())
533         monitor = self.monitor_list()[0].proxy()
534         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
535         self.last_shutdown.success.get.return_value = False
536         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
537         self.busywait(lambda: 1 == self.paired_monitor_count())
538
539         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
540         self.last_shutdown.success.get.return_value = True
541         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
542         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
543         self.busywait(lambda: 0 == self.paired_monitor_count())
544
545     def test_nodes_shutting_down_replaced_below_max_nodes(self):
546         size = testutil.MockSize(6)
547         cloud_node = testutil.cloud_node_mock(6, size=size)
548         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
549                          avail_sizes=[(size, {"cores":1})])
550         self.assertEqual(1, self.alive_monitor_count())
551         monitor = self.monitor_list()[0].proxy()
552         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
553         self.assertTrue(self.node_shutdown.start.called)
554         self.daemon.update_server_wishlist(
555             [testutil.MockSize(6)]).get(self.TIMEOUT)
556         self.busywait(lambda: self.node_setup.start.called)
557
558     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
559         cloud_node = testutil.cloud_node_mock(7)
560         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
561                          max_nodes=1)
562         self.busywait(lambda: 1 == self.paired_monitor_count())
563         monitor = self.monitor_list()[0].proxy()
564         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
565         self.assertTrue(self.node_shutdown.start.called)
566         self.daemon.update_server_wishlist(
567             [testutil.MockSize(7)]).get(self.TIMEOUT)
568         self.busywait(lambda: not self.node_setup.start.called)
569
570     def test_nodes_shutting_down_count_against_excess(self):
571         size = testutil.MockSize(8)
572         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
573         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
574         self.make_daemon(cloud_nodes, arv_nodes, [size],
575                          avail_sizes=[(size, {"cores":1})])
576         self.busywait(lambda: 2 == self.paired_monitor_count())
577         for mon_ref in self.monitor_list():
578             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
579         self.assertEqual(1, self.node_shutdown.start.call_count)
580
581     def test_clean_shutdown_waits_for_node_setup_finish(self):
582         new_node = self.start_node_boot()
583         new_node.stop_if_no_cloud_node().get.return_value = False
584         new_node.stop_if_no_cloud_node.reset_mock()
585         self.daemon.shutdown().get(self.TIMEOUT)
586         self.assertTrue(new_node.stop_if_no_cloud_node.called)
587         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
588         self.assertTrue(new_node.stop.called)
589         self.timer.deliver()
590         self.assertTrue(
591             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
592
593     def test_wishlist_ignored_after_shutdown(self):
594         new_node = self.start_node_boot()
595         new_node.stop_if_no_cloud_node().get.return_value = False
596         new_node.stop_if_no_cloud_node.reset_mock()
597         self.daemon.shutdown().get(self.TIMEOUT)
598         size = testutil.MockSize(2)
599         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
600         self.timer.deliver()
601         self.busywait(lambda: 1 == self.node_setup.start.call_count)
602
603     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
604         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
605         self.assertEqual(1, self.alive_monitor_count())
606         monitor = self.monitor_list()[0].proxy()
607         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
608         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
609         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
610
611     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
612         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
613         self.assertEqual(1, self.alive_monitor_count())
614         monitor = self.monitor_list()[0].proxy()
615         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
616         # We're mainly testing that update_cloud_nodes catches and handles
617         # the ActorDeadError.
618         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
619         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
620         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
621
622     def test_node_create_two_sizes(self):
623         small = testutil.MockSize(1)
624         big = testutil.MockSize(2)
625         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
626                         (testutil.MockSize(2), {"cores":2})]
627         self.make_daemon(want_sizes=[small, small, small, big],
628                          avail_sizes=avail_sizes, max_nodes=4)
629
630         # the daemon runs in another thread, so we need to wait and see
631         # if it does all the work we're expecting it to do before stopping it.
632         self.busywait(lambda: self.node_setup.start.call_count == 4)
633         booting = self.daemon.booting.get(self.TIMEOUT)
634         self.stop_proxy(self.daemon)
635         sizecounts = {a[0].id: 0 for a in avail_sizes}
636         for b in booting.itervalues():
637             sizecounts[b.cloud_size.get().id] += 1
638         logging.info(sizecounts)
639         self.assertEqual(3, sizecounts[small.id])
640         self.assertEqual(1, sizecounts[big.id])
641
642     def test_node_max_nodes_two_sizes(self):
643         small = testutil.MockSize(1)
644         big = testutil.MockSize(2)
645         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
646                         (testutil.MockSize(2), {"cores":2})]
647         self.make_daemon(want_sizes=[small, small, small, big],
648                          avail_sizes=avail_sizes, max_nodes=3)
649
650         # the daemon runs in another thread, so we need to wait and see
651         # if it does all the work we're expecting it to do before stopping it.
652         self.busywait(lambda: self.node_setup.start.call_count == 3)
653         booting = self.daemon.booting.get(self.TIMEOUT)
654         self.stop_proxy(self.daemon)
655         sizecounts = {a[0].id: 0 for a in avail_sizes}
656         for b in booting.itervalues():
657             sizecounts[b.cloud_size.get().id] += 1
658         self.assertEqual(2, sizecounts[small.id])
659         self.assertEqual(1, sizecounts[big.id])
660
661     def test_wishlist_reconfigure(self):
662         small = testutil.MockSize(1)
663         big = testutil.MockSize(2)
664         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
665
666         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
667                                       testutil.cloud_node_mock(2, small),
668                                       testutil.cloud_node_mock(3, big)],
669                          arvados_nodes=[testutil.arvados_node_mock(1),
670                                         testutil.arvados_node_mock(2),
671                                         testutil.arvados_node_mock(3)],
672                          want_sizes=[small, small, big],
673                          avail_sizes=avail_sizes)
674         self.busywait(lambda: 3 == self.paired_monitor_count())
675         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
676
677         self.assertEqual(0, self.node_shutdown.start.call_count)
678
679         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
680             self.daemon.node_can_shutdown(c.actor)
681
682         booting = self.daemon.booting.get()
683         cloud_nodes = self.daemon.cloud_nodes.get()
684
685         self.busywait(lambda: 1 == self.node_setup.start.call_count)
686         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
687
688         self.stop_proxy(self.daemon)
689
690         # booting a new big node
691         sizecounts = {a[0].id: 0 for a in avail_sizes}
692         for b in booting.itervalues():
693             sizecounts[b.cloud_size.get().id] += 1
694         self.assertEqual(0, sizecounts[small.id])
695         self.assertEqual(1, sizecounts[big.id])
696
697         # shutting down a small node
698         sizecounts = {a[0].id: 0 for a in avail_sizes}
699         for b in cloud_nodes.nodes.itervalues():
700             if b.shutdown_actor is not None:
701                 sizecounts[b.cloud_node.size.id] += 1
702         self.assertEqual(1, sizecounts[small.id])
703         self.assertEqual(0, sizecounts[big.id])
704
705     def test_node_max_price(self):
706         small = testutil.MockSize(1)
707         big = testutil.MockSize(2)
708         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
709                         (testutil.MockSize(2), {"cores":2, "price":2})]
710         self.make_daemon(want_sizes=[small, small, small, big],
711                          avail_sizes=avail_sizes,
712                          max_nodes=4,
713                          max_total_price=4)
714         # the daemon runs in another thread, so we need to wait and see
715         # if it does all the work we're expecting it to do before stopping it.
716         self.busywait(lambda: self.node_setup.start.call_count == 3)
717         booting = self.daemon.booting.get()
718         self.stop_proxy(self.daemon)
719
720         sizecounts = {a[0].id: 0 for a in avail_sizes}
721         for b in booting.itervalues():
722             sizecounts[b.cloud_size.get().id] += 1
723         logging.info(sizecounts)
724
725         # Booting 3 small nodes and not booting a big node would also partially
726         # satisfy the wishlist and come in under the price cap, however the way
727         # the update_server_wishlist() currently works effectively results in a
728         # round-robin creation of one node of each size in the wishlist, so
729         # test for that.
730         self.assertEqual(2, sizecounts[small.id])
731         self.assertEqual(1, sizecounts[big.id])