Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24     def mock_node_start(self, **kwargs):
25         # Make sure that every time the daemon starts a setup actor,
26         # it gets a new mock object back.
27         get_cloud_size = mock.MagicMock()
28         get_cloud_size.get.return_value = kwargs["cloud_size"]
29         mock_actor = mock.MagicMock()
30         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
31                                           cloud_size=get_cloud_size,
32                                           actor_ref=mock_actor)
33         mock_actor.proxy.return_value = mock_proxy
34         mock_actor.tell_proxy.return_value = mock_proxy
35
36         self.last_setup = mock_proxy
37         return mock_actor
38
39     def mock_node_shutdown(self, **kwargs):
40         # Make sure that every time the daemon starts a shutdown actor,
41         # it gets a new mock object back.
42         get_cloud_node = mock.MagicMock()
43         if "node_monitor" in kwargs:
44             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
45         mock_actor = mock.MagicMock()
46         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
47                                           cloud_node=get_cloud_node,
48                                           actor_ref=mock_actor)
49
50         mock_actor.proxy.return_value = mock_proxy
51         self.last_shutdown = mock_proxy
52
53         return mock_actor
54
55     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
56                     avail_sizes=None,
57                     min_nodes=0, max_nodes=8,
58                     shutdown_windows=[54, 5, 1],
59                     max_total_price=None):
60         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
61             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
62
63         if not avail_sizes:
64             if cloud_nodes or want_sizes:
65                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
66             else:
67                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
68
69         self.arv_factory = mock.MagicMock(name='arvados_mock')
70         api_client = mock.MagicMock(name='api_client')
71         api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
72                                                            testutil.arvados_node_mock(2)]
73         self.arv_factory.return_value = api_client
74
75         self.cloud_factory = mock.MagicMock(name='cloud_mock')
76         self.cloud_factory().node_start_time.return_value = time.time()
77         self.cloud_updates = mock.MagicMock(name='updates_mock')
78         self.timer = testutil.MockTimer(deliver_immediately=False)
79         self.cloud_factory().node_id.side_effect = lambda node: node.id
80         self.cloud_factory().broken.return_value = False
81
82         self.node_setup = mock.MagicMock(name='setup_mock')
83         self.node_setup.start.side_effect = self.mock_node_start
84         self.node_setup.reset_mock()
85
86         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
87         self.node_shutdown.start.side_effect = self.mock_node_shutdown
88
89         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
90             self.server_wishlist_poller, self.arvados_nodes_poller,
91             self.cloud_nodes_poller, self.cloud_updates, self.timer,
92             self.arv_factory, self.cloud_factory,
93             shutdown_windows, ServerCalculator(avail_sizes),
94             min_nodes, max_nodes, 600, 1800, 3600,
95             self.node_setup, self.node_shutdown,
96             max_total_price=max_total_price).proxy()
97         if arvados_nodes is not None:
98             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
99         if cloud_nodes is not None:
100             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
101         if want_sizes is not None:
102             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
103
104     def monitor_list(self):
105         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
106
107     def monitored_arvados_nodes(self):
108         pairings = []
109         for future in [actor.proxy().arvados_node
110                        for actor in self.monitor_list()]:
111             try:
112                 pairings.append(future.get(self.TIMEOUT))
113             except pykka.ActorDeadError:
114                 pass
115         return pairings
116
117     def alive_monitor_count(self):
118         return len(self.monitored_arvados_nodes())
119
120     def assertShutdownCancellable(self, expected=True):
121         self.assertTrue(self.node_shutdown.start.called)
122         self.assertIs(expected,
123                       self.node_shutdown.start.call_args[1]['cancellable'],
124                       "ComputeNodeShutdownActor incorrectly cancellable")
125
126     def test_easy_node_creation(self):
127         size = testutil.MockSize(1)
128         self.make_daemon(want_sizes=[size])
129         self.stop_proxy(self.daemon)
130         self.assertTrue(self.node_setup.start.called)
131
132     def check_monitors_arvados_nodes(self, *arv_nodes):
133         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
134
135     def test_node_pairing(self):
136         cloud_node = testutil.cloud_node_mock(1)
137         arv_node = testutil.arvados_node_mock(1)
138         self.make_daemon([cloud_node], [arv_node])
139         self.stop_proxy(self.daemon)
140         self.check_monitors_arvados_nodes(arv_node)
141
142     def test_node_pairing_after_arvados_update(self):
143         cloud_node = testutil.cloud_node_mock(2)
144         self.make_daemon([cloud_node],
145                          [testutil.arvados_node_mock(1, ip_address=None)])
146         arv_node = testutil.arvados_node_mock(2)
147         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
148         self.stop_proxy(self.daemon)
149         self.check_monitors_arvados_nodes(arv_node)
150
151     def test_arvados_node_un_and_re_paired(self):
152         # We need to create the Arvados node mock after spinning up the daemon
153         # to make sure it's new enough to pair with the cloud node.
154         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
155                          arvados_nodes=None)
156         arv_node = testutil.arvados_node_mock(3)
157         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
158         self.check_monitors_arvados_nodes(arv_node)
159         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
160         self.assertEqual(0, self.alive_monitor_count())
161         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
162         self.stop_proxy(self.daemon)
163         self.check_monitors_arvados_nodes(arv_node)
164
165     def test_old_arvados_node_not_double_assigned(self):
166         arv_node = testutil.arvados_node_mock(3, age=9000)
167         size = testutil.MockSize(3)
168         self.make_daemon(arvados_nodes=[arv_node],
169                          avail_sizes=[(size, {"cores":1})])
170         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
171         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
172         self.stop_proxy(self.daemon)
173         used_nodes = [call[1].get('arvados_node')
174                       for call in self.node_setup.start.call_args_list]
175         self.assertEqual(2, len(used_nodes))
176         self.assertIn(arv_node, used_nodes)
177         self.assertIn(None, used_nodes)
178
179     def test_node_count_satisfied(self):
180         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
181                          want_sizes=[testutil.MockSize(1)])
182         self.stop_proxy(self.daemon)
183         self.assertFalse(self.node_setup.start.called)
184
185     def test_dont_count_missing_as_busy(self):
186         size = testutil.MockSize(1)
187         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
188                                       testutil.cloud_node_mock(2, size=size)],
189                          arvados_nodes=[testutil.arvados_node_mock(1),
190                                         testutil.arvados_node_mock(
191                                             2,
192                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
193                          want_sizes=[size, size])
194         self.stop_proxy(self.daemon)
195         self.assertTrue(self.node_setup.start.called)
196
197     def test_missing_counts_towards_max(self):
198         size = testutil.MockSize(1)
199         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
200                                       testutil.cloud_node_mock(2, size=size)],
201                          arvados_nodes=[testutil.arvados_node_mock(1),
202                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
203                          want_sizes=[size, size],
204                          max_nodes=2)
205         self.stop_proxy(self.daemon)
206         self.assertFalse(self.node_setup.start.called)
207
208     def test_excess_counts_missing(self):
209         size = testutil.MockSize(1)
210         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
211         self.make_daemon(cloud_nodes=cloud_nodes,
212                          arvados_nodes=[testutil.arvados_node_mock(1),
213                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
214                          want_sizes=[size])
215         self.assertEqual(2, self.alive_monitor_count())
216         for mon_ref in self.monitor_list():
217             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
218         self.assertEqual(1, self.node_shutdown.start.call_count)
219
220     def test_missing_shutdown_not_excess(self):
221         size = testutil.MockSize(1)
222         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
223         self.make_daemon(cloud_nodes=cloud_nodes,
224                          arvados_nodes=[testutil.arvados_node_mock(1),
225                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
226                          want_sizes=[size])
227
228         get_cloud_node = mock.MagicMock(name="get_cloud_node")
229         get_cloud_node.get.return_value = cloud_nodes[1]
230         mock_node_monitor = mock.MagicMock()
231         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
232         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
233
234         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
235
236         self.assertEqual(2, self.alive_monitor_count())
237         for mon_ref in self.monitor_list():
238             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
239         self.assertEqual(1, self.node_shutdown.start.call_count)
240
241     def test_booting_nodes_counted(self):
242         cloud_node = testutil.cloud_node_mock(1)
243         arv_node = testutil.arvados_node_mock(1)
244         server_wishlist = [testutil.MockSize(1)] * 2
245         self.make_daemon([cloud_node], [arv_node], server_wishlist)
246         self.daemon.max_nodes.get(self.TIMEOUT)
247         self.assertTrue(self.node_setup.start.called)
248         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
249         self.stop_proxy(self.daemon)
250         self.assertEqual(1, self.node_setup.start.call_count)
251
252     def test_boot_new_node_when_all_nodes_busy(self):
253         size = testutil.MockSize(2)
254         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
255         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
256                          [size], avail_sizes=[(size, {"cores":1})])
257         self.busywait(lambda: self.node_setup.start.called)
258         self.stop_proxy(self.daemon)
259         self.assertTrue(self.node_setup.start.called)
260
261     def test_boot_new_node_below_min_nodes(self):
262         min_size = testutil.MockSize(1)
263         wish_size = testutil.MockSize(3)
264         avail_sizes = [(min_size, {"cores": 1}),
265                        (wish_size, {"cores": 3})]
266         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
267         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
268         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
269         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
270         self.stop_proxy(self.daemon)
271         self.assertEqual([wish_size, min_size],
272                          [call[1].get('cloud_size')
273                           for call in self.node_setup.start.call_args_list])
274
275     def test_no_new_node_when_ge_min_nodes_busy(self):
276         size = testutil.MockSize(2)
277         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
278         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
279                      for n in range(1, 4)]
280         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
281         self.stop_proxy(self.daemon)
282         self.assertEqual(0, self.node_setup.start.call_count)
283
284     def test_no_new_node_when_max_nodes_busy(self):
285         size = testutil.MockSize(3)
286         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
287                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
288                          want_sizes=[size],
289                          max_nodes=1)
290         self.stop_proxy(self.daemon)
291         self.assertFalse(self.node_setup.start.called)
292
293     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
294         if cloud_node is None:
295             cloud_node = testutil.cloud_node_mock(id_num)
296         id_num = int(cloud_node.id)
297         if arv_node is None:
298             arv_node = testutil.arvados_node_mock(id_num)
299         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
300                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
301         self.daemon.max_nodes.get(self.TIMEOUT)
302         self.assertEqual(1, self.node_setup.start.call_count)
303         self.last_setup.cloud_node.get.return_value = cloud_node
304         self.last_setup.arvados_node.get.return_value = arv_node
305         return self.last_setup
306
307     def test_new_node_when_booted_node_not_usable(self):
308         cloud_node = testutil.cloud_node_mock(4)
309         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
310         setup = self.start_node_boot(cloud_node, arv_node)
311         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
312         self.assertEqual(1, self.alive_monitor_count())
313         self.daemon.update_arvados_nodes([arv_node])
314         self.daemon.update_cloud_nodes([cloud_node])
315         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
316         self.daemon.update_server_wishlist(
317             [testutil.MockSize(4)]).get(self.TIMEOUT)
318         self.stop_proxy(self.daemon)
319         self.assertEqual(2, self.node_setup.start.call_count)
320
321     def test_no_duplication_when_booting_node_listed_fast(self):
322         # Test that we don't start two ComputeNodeMonitorActors when
323         # we learn about a booting node through a listing before we
324         # get the "node up" message from CloudNodeSetupActor.
325         cloud_node = testutil.cloud_node_mock(1)
326         setup = self.start_node_boot(cloud_node)
327         self.daemon.update_cloud_nodes([cloud_node])
328         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
329         self.assertEqual(1, self.alive_monitor_count())
330
331     def test_no_duplication_when_booted_node_listed(self):
332         cloud_node = testutil.cloud_node_mock(2)
333         setup = self.start_node_boot(cloud_node, id_num=2)
334         self.daemon.node_setup_finished(setup)
335         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
336         self.assertEqual(1, self.alive_monitor_count())
337
338     def test_node_counted_after_boot_with_slow_listing(self):
339         # Test that, after we boot a compute node, we assume it exists
340         # even it doesn't appear in the listing (e.g., because of delays
341         # propagating tags).
342         setup = self.start_node_boot()
343         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
344         self.assertEqual(1, self.alive_monitor_count())
345         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
346         self.assertEqual(1, self.alive_monitor_count())
347
348     def test_booted_unlisted_node_counted(self):
349         setup = self.start_node_boot(id_num=1)
350         self.daemon.node_setup_finished(setup)
351         self.daemon.update_server_wishlist(
352             [testutil.MockSize(1)]).get(self.TIMEOUT)
353         self.stop_proxy(self.daemon)
354         self.assertEqual(1, self.node_setup.start.call_count)
355
356     def test_booted_node_can_shutdown(self):
357         setup = self.start_node_boot()
358         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
359         self.assertEqual(1, self.alive_monitor_count())
360         monitor = self.monitor_list()[0].proxy()
361         self.daemon.update_server_wishlist([])
362         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
363         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
364         self.stop_proxy(self.daemon)
365         self.assertTrue(self.node_shutdown.start.called,
366                         "daemon did not shut down booted node on offer")
367
368         with test_status.TestServer() as srv:
369             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
370             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
371             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
372
373     def test_booted_node_lifecycle(self):
374         cloud_node = testutil.cloud_node_mock(6)
375         setup = self.start_node_boot(cloud_node, id_num=6)
376         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
377         self.assertEqual(1, self.alive_monitor_count())
378         monitor = self.monitor_list()[0].proxy()
379         self.daemon.update_server_wishlist([])
380         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
381         self.assertShutdownCancellable(True)
382         shutdown = self.node_shutdown.start().proxy()
383         shutdown.cloud_node.get.return_value = cloud_node
384         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
385         self.daemon.update_cloud_nodes([])
386         self.assertTrue(shutdown.stop.called,
387                         "shutdown actor not stopped after finishing")
388         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
389                         "monitor for booted node not stopped after shutdown")
390         self.daemon.update_server_wishlist(
391             [testutil.MockSize(2)]).get(self.TIMEOUT)
392         self.stop_proxy(self.daemon)
393         self.assertTrue(self.node_setup.start.called,
394                         "second node not started after booted node stopped")
395
396     def test_booted_node_shut_down_when_never_listed(self):
397         setup = self.start_node_boot()
398         self.cloud_factory().node_start_time.return_value = time.time() - 3601
399         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
400         self.assertEqual(1, self.alive_monitor_count())
401         self.assertFalse(self.node_shutdown.start.called)
402         now = time.time()
403         self.monitor_list()[0].tell_proxy().consider_shutdown()
404         self.busywait(lambda: self.node_shutdown.start.called)
405         self.stop_proxy(self.daemon)
406         self.assertShutdownCancellable(False)
407
408     def test_booted_node_shut_down_when_never_paired(self):
409         cloud_node = testutil.cloud_node_mock(2)
410         setup = self.start_node_boot(cloud_node)
411         self.cloud_factory().node_start_time.return_value = time.time() - 3601
412         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
413         self.assertEqual(1, self.alive_monitor_count())
414         self.daemon.update_cloud_nodes([cloud_node])
415         self.monitor_list()[0].tell_proxy().consider_shutdown()
416         self.busywait(lambda: self.node_shutdown.start.called)
417         self.stop_proxy(self.daemon)
418         self.assertShutdownCancellable(False)
419
420     def test_booted_node_shut_down_when_never_working(self):
421         cloud_node = testutil.cloud_node_mock(4)
422         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
423         setup = self.start_node_boot(cloud_node, arv_node)
424         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
425         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
426         self.assertEqual(1, self.alive_monitor_count())
427         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
428         self.daemon.update_cloud_nodes([cloud_node])
429         self.busywait(lambda: self.node_shutdown.start.called)
430         self.stop_proxy(self.daemon)
431         self.assertShutdownCancellable(False)
432
433     def test_node_that_pairs_not_considered_failed_boot(self):
434         cloud_node = testutil.cloud_node_mock(3)
435         arv_node = testutil.arvados_node_mock(3)
436         setup = self.start_node_boot(cloud_node, arv_node)
437         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
438         self.assertEqual(1, self.alive_monitor_count())
439         self.daemon.update_cloud_nodes([cloud_node])
440         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
441         self.timer.deliver()
442         self.stop_proxy(self.daemon)
443         self.assertFalse(self.node_shutdown.start.called)
444
445     def test_node_that_pairs_busy_not_considered_failed_boot(self):
446         cloud_node = testutil.cloud_node_mock(5)
447         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
448         setup = self.start_node_boot(cloud_node, arv_node)
449         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
450         self.assertEqual(1, self.alive_monitor_count())
451         self.daemon.update_cloud_nodes([cloud_node])
452         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
453         self.timer.deliver()
454         self.stop_proxy(self.daemon)
455         self.assertFalse(self.node_shutdown.start.called)
456
457     def test_booting_nodes_shut_down(self):
458         self.make_daemon(want_sizes=[testutil.MockSize(1)])
459         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
460         self.stop_proxy(self.daemon)
461         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
462
463     def test_all_booting_nodes_tried_to_shut_down(self):
464         size = testutil.MockSize(2)
465         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
466         self.daemon.max_nodes.get(self.TIMEOUT)
467         setup1 = self.last_setup
468         setup1.stop_if_no_cloud_node().get.return_value = False
469         setup1.stop_if_no_cloud_node.reset_mock()
470         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
471         self.daemon.max_nodes.get(self.TIMEOUT)
472         self.assertIsNot(setup1, self.last_setup)
473         self.last_setup.stop_if_no_cloud_node().get.return_value = True
474         self.last_setup.stop_if_no_cloud_node.reset_mock()
475         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
476         self.daemon.max_nodes.get(self.TIMEOUT)
477         self.stop_proxy(self.daemon)
478         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
479         self.assertTrue(setup1.stop_if_no_cloud_node.called)
480
481     def test_shutdown_declined_at_wishlist_capacity(self):
482         cloud_node = testutil.cloud_node_mock(1)
483         arv_node = testutil.arvados_node_mock(1)
484         size = testutil.MockSize(1)
485         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
486         self.assertEqual(1, self.alive_monitor_count())
487         monitor = self.monitor_list()[0].proxy()
488         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
489         self.stop_proxy(self.daemon)
490         self.assertFalse(self.node_shutdown.start.called)
491
492     def test_shutdown_declined_below_min_nodes(self):
493         cloud_node = testutil.cloud_node_mock(1)
494         arv_node = testutil.arvados_node_mock(1)
495         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
496         self.assertEqual(1, self.alive_monitor_count())
497         monitor = self.monitor_list()[0].proxy()
498         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
499         self.stop_proxy(self.daemon)
500         self.assertFalse(self.node_shutdown.start.called)
501
502     def test_shutdown_accepted_below_capacity(self):
503         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
504         self.assertEqual(1, self.alive_monitor_count())
505         monitor = self.monitor_list()[0].proxy()
506         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
507         self.stop_proxy(self.daemon)
508         self.assertTrue(self.node_shutdown.start.called)
509
510     def test_shutdown_declined_when_idle_and_job_queued(self):
511         size = testutil.MockSize(1)
512         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
513         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
514                      testutil.arvados_node_mock(4, job_uuid=None)]
515         self.make_daemon(cloud_nodes, arv_nodes, [size])
516         self.assertEqual(2, self.alive_monitor_count())
517         for mon_ref in self.monitor_list():
518             monitor = mon_ref.proxy()
519             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
520                 break
521         else:
522             self.fail("monitor for idle node not found")
523         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
524         self.stop_proxy(self.daemon)
525         self.assertFalse(self.node_shutdown.start.called)
526
527     def test_node_shutdown_after_cancelled_shutdown(self):
528         cloud_node = testutil.cloud_node_mock(5)
529         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
530         self.assertEqual(1, self.alive_monitor_count())
531         monitor = self.monitor_list()[0].proxy()
532         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
533         self.last_shutdown.success.get.return_value = False
534         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
535         self.assertEqual(1, self.alive_monitor_count())
536
537         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
538         self.last_shutdown.success.get.return_value = True
539         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
540         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
541         self.assertEqual(0, self.alive_monitor_count())
542
543     def test_nodes_shutting_down_replaced_below_max_nodes(self):
544         size = testutil.MockSize(6)
545         cloud_node = testutil.cloud_node_mock(6, size=size)
546         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
547                          avail_sizes=[(size, {"cores":1})])
548         self.assertEqual(1, self.alive_monitor_count())
549         monitor = self.monitor_list()[0].proxy()
550         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
551         self.assertTrue(self.node_shutdown.start.called)
552         self.daemon.update_server_wishlist(
553             [testutil.MockSize(6)]).get(self.TIMEOUT)
554         self.stop_proxy(self.daemon)
555         self.assertTrue(self.node_setup.start.called)
556
557     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
558         cloud_node = testutil.cloud_node_mock(7)
559         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
560                          max_nodes=1)
561         self.assertEqual(1, self.alive_monitor_count())
562         monitor = self.monitor_list()[0].proxy()
563         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
564         self.assertTrue(self.node_shutdown.start.called)
565         self.daemon.update_server_wishlist(
566             [testutil.MockSize(7)]).get(self.TIMEOUT)
567         self.stop_proxy(self.daemon)
568         self.assertFalse(self.node_setup.start.called)
569
570     def test_nodes_shutting_down_count_against_excess(self):
571         size = testutil.MockSize(8)
572         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
573         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
574         self.make_daemon(cloud_nodes, arv_nodes, [size],
575                          avail_sizes=[(size, {"cores":1})])
576         self.assertEqual(2, self.alive_monitor_count())
577         for mon_ref in self.monitor_list():
578             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
579         self.assertEqual(1, self.node_shutdown.start.call_count)
580
581     def test_clean_shutdown_waits_for_node_setup_finish(self):
582         new_node = self.start_node_boot()
583         new_node.stop_if_no_cloud_node().get.return_value = False
584         new_node.stop_if_no_cloud_node.reset_mock()
585         self.daemon.shutdown().get(self.TIMEOUT)
586         self.assertTrue(new_node.stop_if_no_cloud_node.called)
587         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
588         self.assertTrue(new_node.stop.called)
589         self.timer.deliver()
590         self.assertTrue(
591             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
592
593     def test_wishlist_ignored_after_shutdown(self):
594         new_node = self.start_node_boot()
595         new_node.stop_if_no_cloud_node().get.return_value = False
596         new_node.stop_if_no_cloud_node.reset_mock()
597         self.daemon.shutdown().get(self.TIMEOUT)
598         size = testutil.MockSize(2)
599         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
600         self.timer.deliver()
601         self.stop_proxy(self.daemon)
602         self.assertEqual(1, self.node_setup.start.call_count)
603
604     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
605         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
606         self.assertEqual(1, self.alive_monitor_count())
607         monitor = self.monitor_list()[0].proxy()
608         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
609         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
610         self.stop_proxy(self.daemon)
611         self.assertEqual(
612             1, self.last_shutdown.stop.call_count)
613
614     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
615         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
616         self.assertEqual(1, self.alive_monitor_count())
617         monitor = self.monitor_list()[0].proxy()
618         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
619         # We're mainly testing that update_cloud_nodes catches and handles
620         # the ActorDeadError.
621         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
622         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
623         self.stop_proxy(self.daemon)
624         self.assertEqual(1, self.last_shutdown.stop.call_count)
625
626     def test_node_create_two_sizes(self):
627         small = testutil.MockSize(1)
628         big = testutil.MockSize(2)
629         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
630                         (testutil.MockSize(2), {"cores":2})]
631         self.make_daemon(want_sizes=[small, small, small, big],
632                          avail_sizes=avail_sizes, max_nodes=4)
633
634         # the daemon runs in another thread, so we need to wait and see
635         # if it does all the work we're expecting it to do before stopping it.
636         self.busywait(lambda: self.node_setup.start.call_count == 4)
637         booting = self.daemon.booting.get(self.TIMEOUT)
638         self.stop_proxy(self.daemon)
639         sizecounts = {a[0].id: 0 for a in avail_sizes}
640         for b in booting.itervalues():
641             sizecounts[b.cloud_size.get().id] += 1
642         logging.info(sizecounts)
643         self.assertEqual(3, sizecounts[small.id])
644         self.assertEqual(1, sizecounts[big.id])
645
646     def test_node_max_nodes_two_sizes(self):
647         small = testutil.MockSize(1)
648         big = testutil.MockSize(2)
649         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
650                         (testutil.MockSize(2), {"cores":2})]
651         self.make_daemon(want_sizes=[small, small, small, big],
652                          avail_sizes=avail_sizes, max_nodes=3)
653
654         # the daemon runs in another thread, so we need to wait and see
655         # if it does all the work we're expecting it to do before stopping it.
656         self.busywait(lambda: self.node_setup.start.call_count == 3)
657         booting = self.daemon.booting.get(self.TIMEOUT)
658         self.stop_proxy(self.daemon)
659         sizecounts = {a[0].id: 0 for a in avail_sizes}
660         for b in booting.itervalues():
661             sizecounts[b.cloud_size.get().id] += 1
662         self.assertEqual(2, sizecounts[small.id])
663         self.assertEqual(1, sizecounts[big.id])
664
665     def test_wishlist_reconfigure(self):
666         small = testutil.MockSize(1)
667         big = testutil.MockSize(2)
668         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
669
670         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
671                                       testutil.cloud_node_mock(2, small),
672                                       testutil.cloud_node_mock(3, big)],
673                          arvados_nodes=[testutil.arvados_node_mock(1),
674                                         testutil.arvados_node_mock(2),
675                                         testutil.arvados_node_mock(3)],
676                          want_sizes=[small, small, big],
677                          avail_sizes=avail_sizes)
678
679         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
680
681         self.assertEqual(0, self.node_shutdown.start.call_count)
682
683         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
684             self.daemon.node_can_shutdown(c.actor)
685
686         booting = self.daemon.booting.get()
687         cloud_nodes = self.daemon.cloud_nodes.get()
688
689         self.stop_proxy(self.daemon)
690
691         self.assertEqual(1, self.node_setup.start.call_count)
692         self.assertEqual(1, self.node_shutdown.start.call_count)
693
694         # booting a new big node
695         sizecounts = {a[0].id: 0 for a in avail_sizes}
696         for b in booting.itervalues():
697             sizecounts[b.cloud_size.get().id] += 1
698         self.assertEqual(0, sizecounts[small.id])
699         self.assertEqual(1, sizecounts[big.id])
700
701         # shutting down a small node
702         sizecounts = {a[0].id: 0 for a in avail_sizes}
703         for b in cloud_nodes.nodes.itervalues():
704             if b.shutdown_actor is not None:
705                 sizecounts[b.cloud_node.size.id] += 1
706         self.assertEqual(1, sizecounts[small.id])
707         self.assertEqual(0, sizecounts[big.id])
708
709     def test_node_max_price(self):
710         small = testutil.MockSize(1)
711         big = testutil.MockSize(2)
712         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
713                         (testutil.MockSize(2), {"cores":2, "price":2})]
714         self.make_daemon(want_sizes=[small, small, small, big],
715                          avail_sizes=avail_sizes,
716                          max_nodes=4,
717                          max_total_price=4)
718         # the daemon runs in another thread, so we need to wait and see
719         # if it does all the work we're expecting it to do before stopping it.
720         self.busywait(lambda: self.node_setup.start.call_count == 3)
721         booting = self.daemon.booting.get()
722         self.stop_proxy(self.daemon)
723
724         sizecounts = {a[0].id: 0 for a in avail_sizes}
725         for b in booting.itervalues():
726             sizecounts[b.cloud_size.get().id] += 1
727         logging.info(sizecounts)
728
729         # Booting 3 small nodes and not booting a big node would also partially
730         # satisfy the wishlist and come in under the price cap, however the way
731         # the update_server_wishlist() currently works effectively results in a
732         # round-robin creation of one node of each size in the wishlist, so
733         # test for that.
734         self.assertEqual(2, sizecounts[small.id])
735         self.assertEqual(1, sizecounts[big.id])