ebe7408e705b02e2d55b2d757ef5367953f23242
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24
25     def busywait(self, f):
26         n = 0
27         while not f() and n < 200:
28             time.sleep(.1)
29             self.daemon.ping().get(self.TIMEOUT)
30             n += 1
31         self.assertTrue(f())
32
33     def mock_node_start(self, **kwargs):
34         # Make sure that every time the daemon starts a setup actor,
35         # it gets a new mock object back.
36         get_cloud_size = mock.MagicMock()
37         get_cloud_size.get.return_value = kwargs["cloud_size"]
38         mock_actor = mock.MagicMock()
39         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
40                                           cloud_size=get_cloud_size,
41                                           actor_ref=mock_actor)
42         mock_actor.proxy.return_value = mock_proxy
43         mock_actor.tell_proxy.return_value = mock_proxy
44
45         self.last_setup = mock_proxy
46         return mock_actor
47
48     def mock_node_shutdown(self, **kwargs):
49         # Make sure that every time the daemon starts a shutdown actor,
50         # it gets a new mock object back.
51         get_cloud_node = mock.MagicMock()
52         if "node_monitor" in kwargs:
53             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
54         mock_actor = mock.MagicMock()
55         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
56                                           cloud_node=get_cloud_node,
57                                           actor_ref=mock_actor)
58
59         mock_actor.proxy.return_value = mock_proxy
60         self.last_shutdown = mock_proxy
61
62         return mock_actor
63
64     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
65                     avail_sizes=None,
66                     min_nodes=0, max_nodes=8,
67                     shutdown_windows=[54, 5, 1],
68                     max_total_price=None):
69         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
70             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
71
72         if not avail_sizes:
73             if cloud_nodes or want_sizes:
74                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
75             else:
76                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
77
78         self.arv_factory = mock.MagicMock(name='arvados_mock')
79         api_client = mock.MagicMock(name='api_client')
80         api_client.nodes().create().execute.side_effect = \
81             [testutil.arvados_node_mock(1),
82              testutil.arvados_node_mock(2)]
83         self.arv_factory.return_value = api_client
84
85         self.cloud_factory = mock.MagicMock(name='cloud_mock')
86         self.cloud_factory().node_start_time.return_value = time.time()
87         self.cloud_updates = mock.MagicMock(name='updates_mock')
88         self.timer = testutil.MockTimer(deliver_immediately=False)
89         self.cloud_factory().node_id.side_effect = lambda node: node.id
90         self.cloud_factory().broken.return_value = False
91
92         self.node_setup = mock.MagicMock(name='setup_mock')
93         self.node_setup.start.side_effect = self.mock_node_start
94         self.node_setup.reset_mock()
95
96         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
97         self.node_shutdown.start.side_effect = self.mock_node_shutdown
98
99         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
100             self.server_wishlist_poller, self.arvados_nodes_poller,
101             self.cloud_nodes_poller, self.cloud_updates, self.timer,
102             self.arv_factory, self.cloud_factory,
103             shutdown_windows, ServerCalculator(avail_sizes),
104             min_nodes, max_nodes, 600, 1800, 3600,
105             self.node_setup, self.node_shutdown,
106             max_total_price=max_total_price).proxy()
107         if arvados_nodes is not None:
108             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
109         if cloud_nodes is not None:
110             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
111         if want_sizes is not None:
112             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
113
114     def monitor_list(self):
115         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
116
117     def monitored_arvados_nodes(self, include_unpaired=True):
118         pairings = []
119         for future in [actor.proxy().arvados_node
120                        for actor in self.monitor_list()]:
121             try:
122                 g = future.get(self.TIMEOUT)
123                 if g or include_unpaired:
124                     pairings.append(g)
125             except pykka.ActorDeadError:
126                 pass
127         return pairings
128
129     def alive_monitor_count(self):
130         return len(self.monitored_arvados_nodes())
131
132     def paired_monitor_count(self):
133         return len(self.monitored_arvados_nodes(False))
134
135     def assertShutdownCancellable(self, expected=True):
136         self.assertTrue(self.node_shutdown.start.called)
137         self.assertIs(expected,
138                       self.node_shutdown.start.call_args[1]['cancellable'],
139                       "ComputeNodeShutdownActor incorrectly cancellable")
140
141     def test_easy_node_creation(self):
142         size = testutil.MockSize(1)
143         self.make_daemon(want_sizes=[size])
144         self.busywait(lambda: self.node_setup.start.called)
145
146     def check_monitors_arvados_nodes(self, *arv_nodes):
147         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
148         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
149
150     def test_node_pairing(self):
151         cloud_node = testutil.cloud_node_mock(1)
152         arv_node = testutil.arvados_node_mock(1)
153         self.make_daemon([cloud_node], [arv_node])
154         self.check_monitors_arvados_nodes(arv_node)
155
156     def test_node_pairing_after_arvados_update(self):
157         cloud_node = testutil.cloud_node_mock(2)
158         self.make_daemon([cloud_node],
159                          [testutil.arvados_node_mock(1, ip_address=None)])
160         arv_node = testutil.arvados_node_mock(2)
161         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
162         self.check_monitors_arvados_nodes(arv_node)
163
164     def test_arvados_node_un_and_re_paired(self):
165         # We need to create the Arvados node mock after spinning up the daemon
166         # to make sure it's new enough to pair with the cloud node.
167         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
168                          arvados_nodes=None)
169         arv_node = testutil.arvados_node_mock(3)
170         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
171         self.check_monitors_arvados_nodes(arv_node)
172         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
173         self.busywait(lambda: 0 == self.alive_monitor_count())
174         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
175         self.check_monitors_arvados_nodes(arv_node)
176
177     def test_old_arvados_node_not_double_assigned(self):
178         arv_node = testutil.arvados_node_mock(3, age=9000)
179         size = testutil.MockSize(3)
180         self.make_daemon(arvados_nodes=[arv_node],
181                          avail_sizes=[(size, {"cores":1})])
182         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
183         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
184         self.stop_proxy(self.daemon)
185         used_nodes = [call[1].get('arvados_node')
186                       for call in self.node_setup.start.call_args_list]
187         self.assertEqual(2, len(used_nodes))
188         self.assertIn(arv_node, used_nodes)
189         self.assertIn(None, used_nodes)
190
191     def test_node_count_satisfied(self):
192         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
193                          want_sizes=[testutil.MockSize(1)])
194         self.busywait(lambda: not self.node_setup.start.called)
195
196     def test_select_stale_node_records_with_slot_numbers_first(self):
197         """
198         Stale node records with slot_number assigned can exist when
199         clean_arvados_node() isn't executed after a node shutdown, for
200         various reasons.
201         NodeManagerDaemonActor should use these stale node records first, so
202         that they don't accumulate unused, reducing the slots available.
203         """
204         size = testutil.MockSize(1)
205         a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
206         arvados_nodes = []
207         for n in range(9):
208             # Add several stale node records without slot_number assigned
209             arvados_nodes.append(
210                 testutil.arvados_node_mock(
211                     n+1,
212                     slot_number=None,
213                     modified_at=a_long_time_ago))
214         # Add one record with stale_node assigned, it should be the
215         # first one selected
216         arv_node = testutil.arvados_node_mock(
217             123,
218             modified_at=a_long_time_ago)
219         arvados_nodes.append(arv_node)
220         cloud_node = testutil.cloud_node_mock(125, size=size)
221         self.make_daemon(cloud_nodes=[cloud_node],
222                          arvados_nodes=arvados_nodes)
223         arvados_nodes_tracker = self.daemon.arvados_nodes.get()
224         # Here, find_stale_node() should return the node record with
225         # the slot_number assigned.
226         self.assertEqual(arv_node,
227                          arvados_nodes_tracker.find_stale_node(3601))
228
229     def test_dont_count_missing_as_busy(self):
230         size = testutil.MockSize(1)
231         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
232                                       testutil.cloud_node_mock(2, size=size)],
233                          arvados_nodes=[testutil.arvados_node_mock(1),
234                                         testutil.arvados_node_mock(
235                                             2,
236                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
237                          want_sizes=[size, size])
238         self.busywait(lambda: 2 == self.alive_monitor_count())
239         self.busywait(lambda: self.node_setup.start.called)
240
241     def test_missing_counts_towards_max(self):
242         size = testutil.MockSize(1)
243         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
244                                       testutil.cloud_node_mock(2, size=size)],
245                          arvados_nodes=[testutil.arvados_node_mock(1),
246                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
247                          want_sizes=[size, size],
248                          max_nodes=2)
249         self.busywait(lambda: not self.node_setup.start.called)
250
251     def test_excess_counts_missing(self):
252         size = testutil.MockSize(1)
253         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
254         self.make_daemon(cloud_nodes=cloud_nodes,
255                          arvados_nodes=[testutil.arvados_node_mock(1),
256                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
257                          want_sizes=[size])
258         self.busywait(lambda: 2 == self.paired_monitor_count())
259         for mon_ref in self.monitor_list():
260             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
261         self.assertEqual(1, self.node_shutdown.start.call_count)
262
263     def test_missing_shutdown_not_excess(self):
264         size = testutil.MockSize(1)
265         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
266         self.make_daemon(cloud_nodes=cloud_nodes,
267                          arvados_nodes=[testutil.arvados_node_mock(1),
268                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
269                          want_sizes=[size])
270         self.busywait(lambda: 2 == self.paired_monitor_count())
271         get_cloud_node = mock.MagicMock(name="get_cloud_node")
272         get_cloud_node.get.return_value = cloud_nodes[1]
273         mock_node_monitor = mock.MagicMock()
274         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
275         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
276
277         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
278
279         self.busywait(lambda: 2 == self.alive_monitor_count())
280         for mon_ref in self.monitor_list():
281             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
282         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
283
284     def test_booting_nodes_counted(self):
285         cloud_node = testutil.cloud_node_mock(1)
286         arv_node = testutil.arvados_node_mock(1)
287         server_wishlist = [testutil.MockSize(1)] * 2
288         self.make_daemon([cloud_node], [arv_node], server_wishlist)
289         self.daemon.max_nodes.get(self.TIMEOUT)
290         self.assertTrue(self.node_setup.start.called)
291         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
292         self.busywait(lambda: 1 == self.node_setup.start.call_count)
293
294     def test_boot_new_node_when_all_nodes_busy(self):
295         size = testutil.MockSize(2)
296         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
297         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
298                          [size], avail_sizes=[(size, {"cores":1})])
299         self.busywait(lambda: 1 == self.paired_monitor_count())
300         self.busywait(lambda: self.node_setup.start.called)
301
302     def test_boot_new_node_below_min_nodes(self):
303         min_size = testutil.MockSize(1)
304         wish_size = testutil.MockSize(3)
305         avail_sizes = [(min_size, {"cores": 1}),
306                        (wish_size, {"cores": 3})]
307         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
308         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
309         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
310         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
311         self.stop_proxy(self.daemon)
312         self.assertEqual([wish_size, min_size],
313                          [call[1].get('cloud_size')
314                           for call in self.node_setup.start.call_args_list])
315
316     def test_no_new_node_when_ge_min_nodes_busy(self):
317         size = testutil.MockSize(2)
318         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
319         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
320                      for n in range(1, 4)]
321         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
322         self.stop_proxy(self.daemon)
323         self.assertEqual(0, self.node_setup.start.call_count)
324
325     def test_no_new_node_when_max_nodes_busy(self):
326         size = testutil.MockSize(3)
327         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
328                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
329                          want_sizes=[size],
330                          max_nodes=1)
331         self.stop_proxy(self.daemon)
332         self.assertFalse(self.node_setup.start.called)
333
334     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
335         if cloud_node is None:
336             cloud_node = testutil.cloud_node_mock(id_num)
337         id_num = int(cloud_node.id)
338         if arv_node is None:
339             arv_node = testutil.arvados_node_mock(id_num)
340         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
341                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
342         self.daemon.max_nodes.get(self.TIMEOUT)
343         self.assertEqual(1, self.node_setup.start.call_count)
344         self.last_setup.cloud_node.get.return_value = cloud_node
345         self.last_setup.arvados_node.get.return_value = arv_node
346         return self.last_setup
347
348     def test_new_node_when_booted_node_not_usable(self):
349         cloud_node = testutil.cloud_node_mock(4)
350         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
351         setup = self.start_node_boot(cloud_node, arv_node)
352         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
353         self.assertEqual(1, self.alive_monitor_count())
354         self.daemon.update_arvados_nodes([arv_node])
355         self.daemon.update_cloud_nodes([cloud_node])
356         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
357         self.daemon.update_server_wishlist(
358             [testutil.MockSize(4)]).get(self.TIMEOUT)
359         self.stop_proxy(self.daemon)
360         self.assertEqual(2, self.node_setup.start.call_count)
361
362     def test_no_duplication_when_booting_node_listed_fast(self):
363         # Test that we don't start two ComputeNodeMonitorActors when
364         # we learn about a booting node through a listing before we
365         # get the "node up" message from CloudNodeSetupActor.
366         cloud_node = testutil.cloud_node_mock(1)
367         setup = self.start_node_boot(cloud_node)
368         self.daemon.update_cloud_nodes([cloud_node])
369         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
370         self.assertEqual(1, self.alive_monitor_count())
371
372     def test_no_duplication_when_booted_node_listed(self):
373         cloud_node = testutil.cloud_node_mock(2)
374         setup = self.start_node_boot(cloud_node, id_num=2)
375         self.daemon.node_setup_finished(setup)
376         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
377         self.assertEqual(1, self.alive_monitor_count())
378
379     def test_node_counted_after_boot_with_slow_listing(self):
380         # Test that, after we boot a compute node, we assume it exists
381         # even it doesn't appear in the listing (e.g., because of delays
382         # propagating tags).
383         setup = self.start_node_boot()
384         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
385         self.assertEqual(1, self.alive_monitor_count())
386         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
387         self.assertEqual(1, self.alive_monitor_count())
388
389     def test_booted_unlisted_node_counted(self):
390         setup = self.start_node_boot(id_num=1)
391         self.daemon.node_setup_finished(setup)
392         self.daemon.update_server_wishlist(
393             [testutil.MockSize(1)]).get(self.TIMEOUT)
394         self.stop_proxy(self.daemon)
395         self.assertEqual(1, self.node_setup.start.call_count)
396
397     def test_booted_node_can_shutdown(self):
398         setup = self.start_node_boot()
399         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
400         self.assertEqual(1, self.alive_monitor_count())
401         monitor = self.monitor_list()[0].proxy()
402         self.daemon.update_server_wishlist([])
403         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
404         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
405         self.stop_proxy(self.daemon)
406         self.assertTrue(self.node_shutdown.start.called,
407                         "daemon did not shut down booted node on offer")
408
409         with test_status.TestServer() as srv:
410             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
411             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
412             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
413
414     def test_booted_node_lifecycle(self):
415         cloud_node = testutil.cloud_node_mock(6)
416         setup = self.start_node_boot(cloud_node, id_num=6)
417         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
418         self.assertEqual(1, self.alive_monitor_count())
419         monitor = self.monitor_list()[0].proxy()
420         self.daemon.update_server_wishlist([])
421         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
422         self.assertShutdownCancellable(True)
423         shutdown = self.node_shutdown.start().proxy()
424         shutdown.cloud_node.get.return_value = cloud_node
425         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
426         self.daemon.update_cloud_nodes([])
427         self.assertTrue(shutdown.stop.called,
428                         "shutdown actor not stopped after finishing")
429         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
430                         "monitor for booted node not stopped after shutdown")
431         self.daemon.update_server_wishlist(
432             [testutil.MockSize(2)]).get(self.TIMEOUT)
433         self.stop_proxy(self.daemon)
434         self.assertTrue(self.node_setup.start.called,
435                         "second node not started after booted node stopped")
436
437     def test_node_disappearing_during_shutdown(self):
438         cloud_node = testutil.cloud_node_mock(6)
439         setup = self.start_node_boot(cloud_node, id_num=6)
440         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
441         self.assertEqual(1, self.alive_monitor_count())
442         monitor = self.monitor_list()[0].proxy()
443         self.daemon.update_server_wishlist([])
444         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
445         self.assertShutdownCancellable(True)
446         shutdown = self.node_shutdown.start().proxy()
447         shutdown.cloud_node.get.return_value = cloud_node
448         # Simulate a successful but slow node destroy call: the cloud node
449         # list gets updated before the ShutdownActor finishes.
450         record = self.daemon.cloud_nodes.get().nodes.values()[0]
451         self.assertTrue(record.shutdown_actor is not None)
452         self.daemon.cloud_nodes.get().nodes.clear()
453         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
454         self.assertTrue(
455             record.shutdown_actor is not None,
456             "test was ineffective -- failed to simulate the race condition")
457
458     def test_booted_node_shut_down_when_never_listed(self):
459         setup = self.start_node_boot()
460         self.cloud_factory().node_start_time.return_value = time.time() - 3601
461         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
462         self.assertEqual(1, self.alive_monitor_count())
463         self.assertFalse(self.node_shutdown.start.called)
464         now = time.time()
465         self.monitor_list()[0].tell_proxy().consider_shutdown()
466         self.busywait(lambda: self.node_shutdown.start.called)
467         self.assertShutdownCancellable(False)
468
469     def test_booted_node_shut_down_when_never_paired(self):
470         cloud_node = testutil.cloud_node_mock(2)
471         setup = self.start_node_boot(cloud_node)
472         self.cloud_factory().node_start_time.return_value = time.time() - 3601
473         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
474         self.assertEqual(1, self.alive_monitor_count())
475         self.daemon.update_cloud_nodes([cloud_node])
476         self.monitor_list()[0].tell_proxy().consider_shutdown()
477         self.busywait(lambda: self.node_shutdown.start.called)
478         self.assertShutdownCancellable(False)
479
480     def test_booted_node_shut_down_when_never_working(self):
481         cloud_node = testutil.cloud_node_mock(4)
482         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
483         setup = self.start_node_boot(cloud_node, arv_node)
484         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
485         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
486         self.assertEqual(1, self.alive_monitor_count())
487         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
488         self.daemon.update_cloud_nodes([cloud_node])
489         self.busywait(lambda: self.node_shutdown.start.called)
490         self.assertShutdownCancellable(False)
491
492     def test_node_that_pairs_not_considered_failed_boot(self):
493         cloud_node = testutil.cloud_node_mock(3)
494         arv_node = testutil.arvados_node_mock(3)
495         setup = self.start_node_boot(cloud_node, arv_node)
496         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
497         self.assertEqual(1, self.alive_monitor_count())
498         self.daemon.update_cloud_nodes([cloud_node])
499         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
500         self.timer.deliver()
501         self.stop_proxy(self.daemon)
502         self.assertFalse(self.node_shutdown.start.called)
503
504     def test_node_that_pairs_busy_not_considered_failed_boot(self):
505         cloud_node = testutil.cloud_node_mock(5)
506         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
507         setup = self.start_node_boot(cloud_node, arv_node)
508         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
509         self.assertEqual(1, self.alive_monitor_count())
510         self.daemon.update_cloud_nodes([cloud_node])
511         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
512         self.timer.deliver()
513         self.stop_proxy(self.daemon)
514         self.assertFalse(self.node_shutdown.start.called)
515
516     def test_booting_nodes_shut_down(self):
517         self.make_daemon(want_sizes=[testutil.MockSize(1)])
518         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
519         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
520
521     def test_all_booting_nodes_tried_to_shut_down(self):
522         size = testutil.MockSize(2)
523         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
524         self.daemon.max_nodes.get(self.TIMEOUT)
525         setup1 = self.last_setup
526         setup1.stop_if_no_cloud_node().get.return_value = False
527         setup1.stop_if_no_cloud_node.reset_mock()
528         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
529         self.daemon.max_nodes.get(self.TIMEOUT)
530         self.assertIsNot(setup1, self.last_setup)
531         self.last_setup.stop_if_no_cloud_node().get.return_value = True
532         self.last_setup.stop_if_no_cloud_node.reset_mock()
533         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
534         self.daemon.max_nodes.get(self.TIMEOUT)
535         self.stop_proxy(self.daemon)
536         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
537         self.assertTrue(setup1.stop_if_no_cloud_node.called)
538
539     def test_shutdown_declined_at_wishlist_capacity(self):
540         cloud_node = testutil.cloud_node_mock(1)
541         arv_node = testutil.arvados_node_mock(1)
542         size = testutil.MockSize(1)
543         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
544         self.busywait(lambda: 1 == self.paired_monitor_count())
545         monitor = self.monitor_list()[0].proxy()
546         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
547         self.stop_proxy(self.daemon)
548         self.assertFalse(self.node_shutdown.start.called)
549
550     def test_shutdown_declined_below_min_nodes(self):
551         cloud_node = testutil.cloud_node_mock(1)
552         arv_node = testutil.arvados_node_mock(1)
553         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
554         self.busywait(lambda: 1 == self.paired_monitor_count())
555         monitor = self.monitor_list()[0].proxy()
556         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
557         self.stop_proxy(self.daemon)
558         self.assertFalse(self.node_shutdown.start.called)
559
560     def test_shutdown_accepted_below_capacity(self):
561         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
562         self.busywait(lambda: 1 == self.alive_monitor_count())
563         monitor = self.monitor_list()[0].proxy()
564         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
565         self.busywait(lambda: self.node_shutdown.start.called)
566
567     def test_shutdown_declined_when_idle_and_job_queued(self):
568         size = testutil.MockSize(1)
569         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
570         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
571                      testutil.arvados_node_mock(4, job_uuid=None)]
572         self.make_daemon(cloud_nodes, arv_nodes, [size])
573         self.busywait(lambda: 2 == self.paired_monitor_count())
574         for mon_ref in self.monitor_list():
575             monitor = mon_ref.proxy()
576             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
577                 break
578         else:
579             self.fail("monitor for idle node not found")
580         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
581         self.stop_proxy(self.daemon)
582         self.assertFalse(self.node_shutdown.start.called)
583
584     def test_node_shutdown_after_cancelled_shutdown(self):
585         cloud_node = testutil.cloud_node_mock(5)
586         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
587         self.assertEqual(1, self.alive_monitor_count())
588         monitor = self.monitor_list()[0].proxy()
589         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
590         self.last_shutdown.success.get.return_value = False
591         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
592         self.busywait(lambda: 1 == self.paired_monitor_count())
593
594         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
595         self.last_shutdown.success.get.return_value = True
596         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
597         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
598         self.busywait(lambda: 0 == self.paired_monitor_count())
599
600     def test_nodes_shutting_down_replaced_below_max_nodes(self):
601         size = testutil.MockSize(6)
602         cloud_node = testutil.cloud_node_mock(6, size=size)
603         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
604                          avail_sizes=[(size, {"cores":1})])
605         self.assertEqual(1, self.alive_monitor_count())
606         monitor = self.monitor_list()[0].proxy()
607         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
608         self.assertTrue(self.node_shutdown.start.called)
609         self.daemon.update_server_wishlist(
610             [testutil.MockSize(6)]).get(self.TIMEOUT)
611         self.busywait(lambda: self.node_setup.start.called)
612
613     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
614         cloud_node = testutil.cloud_node_mock(7)
615         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
616                          max_nodes=1)
617         self.busywait(lambda: 1 == self.paired_monitor_count())
618         monitor = self.monitor_list()[0].proxy()
619         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
620         self.assertTrue(self.node_shutdown.start.called)
621         self.daemon.update_server_wishlist(
622             [testutil.MockSize(7)]).get(self.TIMEOUT)
623         self.busywait(lambda: not self.node_setup.start.called)
624
625     def test_nodes_shutting_down_count_against_excess(self):
626         size = testutil.MockSize(8)
627         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
628         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
629         self.make_daemon(cloud_nodes, arv_nodes, [size],
630                          avail_sizes=[(size, {"cores":1})])
631         self.busywait(lambda: 2 == self.paired_monitor_count())
632         for mon_ref in self.monitor_list():
633             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
634         self.assertEqual(1, self.node_shutdown.start.call_count)
635
636     def test_clean_shutdown_waits_for_node_setup_finish(self):
637         new_node = self.start_node_boot()
638         new_node.stop_if_no_cloud_node().get.return_value = False
639         new_node.stop_if_no_cloud_node.reset_mock()
640         self.daemon.shutdown().get(self.TIMEOUT)
641         self.assertTrue(new_node.stop_if_no_cloud_node.called)
642         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
643         self.assertTrue(new_node.stop.called)
644         self.timer.deliver()
645         self.assertTrue(
646             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
647
648     def test_wishlist_ignored_after_shutdown(self):
649         new_node = self.start_node_boot()
650         new_node.stop_if_no_cloud_node().get.return_value = False
651         new_node.stop_if_no_cloud_node.reset_mock()
652         self.daemon.shutdown().get(self.TIMEOUT)
653         size = testutil.MockSize(2)
654         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
655         self.timer.deliver()
656         self.busywait(lambda: 1 == self.node_setup.start.call_count)
657
658     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
659         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
660         self.assertEqual(1, self.alive_monitor_count())
661         monitor = self.monitor_list()[0].proxy()
662         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
663         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
664         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
665
666     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
667         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
668         self.assertEqual(1, self.alive_monitor_count())
669         monitor = self.monitor_list()[0].proxy()
670         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
671         # We're mainly testing that update_cloud_nodes catches and handles
672         # the ActorDeadError.
673         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
674         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
675         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
676
677     def test_node_create_two_sizes(self):
678         small = testutil.MockSize(1)
679         big = testutil.MockSize(2)
680         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
681                         (testutil.MockSize(2), {"cores":2})]
682         self.make_daemon(want_sizes=[small, small, small, big],
683                          avail_sizes=avail_sizes, max_nodes=4)
684
685         # the daemon runs in another thread, so we need to wait and see
686         # if it does all the work we're expecting it to do before stopping it.
687         self.busywait(lambda: self.node_setup.start.call_count == 4)
688         booting = self.daemon.booting.get(self.TIMEOUT)
689         self.stop_proxy(self.daemon)
690         sizecounts = {a[0].id: 0 for a in avail_sizes}
691         for b in booting.itervalues():
692             sizecounts[b.cloud_size.get().id] += 1
693         logging.info(sizecounts)
694         self.assertEqual(3, sizecounts[small.id])
695         self.assertEqual(1, sizecounts[big.id])
696
697     def test_node_max_nodes_two_sizes(self):
698         small = testutil.MockSize(1)
699         big = testutil.MockSize(2)
700         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
701                         (testutil.MockSize(2), {"cores":2})]
702         self.make_daemon(want_sizes=[small, small, small, big],
703                          avail_sizes=avail_sizes, max_nodes=3)
704
705         # the daemon runs in another thread, so we need to wait and see
706         # if it does all the work we're expecting it to do before stopping it.
707         self.busywait(lambda: self.node_setup.start.call_count == 3)
708         booting = self.daemon.booting.get(self.TIMEOUT)
709         self.stop_proxy(self.daemon)
710         sizecounts = {a[0].id: 0 for a in avail_sizes}
711         for b in booting.itervalues():
712             sizecounts[b.cloud_size.get().id] += 1
713         self.assertEqual(2, sizecounts[small.id])
714         self.assertEqual(1, sizecounts[big.id])
715
716     def test_wishlist_reconfigure(self):
717         small = testutil.MockSize(1)
718         big = testutil.MockSize(2)
719         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
720
721         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
722                                       testutil.cloud_node_mock(2, small),
723                                       testutil.cloud_node_mock(3, big)],
724                          arvados_nodes=[testutil.arvados_node_mock(1),
725                                         testutil.arvados_node_mock(2),
726                                         testutil.arvados_node_mock(3)],
727                          want_sizes=[small, small, big],
728                          avail_sizes=avail_sizes)
729         self.busywait(lambda: 3 == self.paired_monitor_count())
730         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
731
732         self.assertEqual(0, self.node_shutdown.start.call_count)
733
734         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
735             self.daemon.node_can_shutdown(c.actor)
736
737         booting = self.daemon.booting.get()
738         cloud_nodes = self.daemon.cloud_nodes.get()
739
740         self.busywait(lambda: 1 == self.node_setup.start.call_count)
741         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
742
743         self.stop_proxy(self.daemon)
744
745         # booting a new big node
746         sizecounts = {a[0].id: 0 for a in avail_sizes}
747         for b in booting.itervalues():
748             sizecounts[b.cloud_size.get().id] += 1
749         self.assertEqual(0, sizecounts[small.id])
750         self.assertEqual(1, sizecounts[big.id])
751
752         # shutting down a small node
753         sizecounts = {a[0].id: 0 for a in avail_sizes}
754         for b in cloud_nodes.nodes.itervalues():
755             if b.shutdown_actor is not None:
756                 sizecounts[b.cloud_node.size.id] += 1
757         self.assertEqual(1, sizecounts[small.id])
758         self.assertEqual(0, sizecounts[big.id])
759
760     def test_node_max_price(self):
761         small = testutil.MockSize(1)
762         big = testutil.MockSize(2)
763         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
764                         (testutil.MockSize(2), {"cores":2, "price":2})]
765         self.make_daemon(want_sizes=[small, small, small, big],
766                          avail_sizes=avail_sizes,
767                          max_nodes=4,
768                          max_total_price=4)
769         # the daemon runs in another thread, so we need to wait and see
770         # if it does all the work we're expecting it to do before stopping it.
771         self.busywait(lambda: self.node_setup.start.call_count == 3)
772         booting = self.daemon.booting.get()
773         self.stop_proxy(self.daemon)
774
775         sizecounts = {a[0].id: 0 for a in avail_sizes}
776         for b in booting.itervalues():
777             sizecounts[b.cloud_size.get().id] += 1
778         logging.info(sizecounts)
779
780         # Booting 3 small nodes and not booting a big node would also partially
781         # satisfy the wishlist and come in under the price cap, however the way
782         # the update_server_wishlist() currently works effectively results in a
783         # round-robin creation of one node of each size in the wishlist, so
784         # test for that.
785         self.assertEqual(2, sizecounts[small.id])
786         self.assertEqual(1, sizecounts[big.id])