12199: Improve race handling in busywait.
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24
25     def busywait(self, f):
26         for n in xrange(200):
27             ok = f()
28             if ok:
29                 return
30             time.sleep(.1)
31             self.daemon.ping().get(self.TIMEOUT)
32         self.assertTrue(ok) # always falsy, but not necessarily False
33
34     def mock_node_start(self, **kwargs):
35         # Make sure that every time the daemon starts a setup actor,
36         # it gets a new mock object back.
37         get_cloud_size = mock.MagicMock()
38         get_cloud_size.get.return_value = kwargs["cloud_size"]
39         mock_actor = mock.MagicMock()
40         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
41                                           cloud_size=get_cloud_size,
42                                           actor_ref=mock_actor)
43         mock_actor.proxy.return_value = mock_proxy
44         mock_actor.tell_proxy.return_value = mock_proxy
45
46         self.last_setup = mock_proxy
47         return mock_actor
48
49     def mock_node_shutdown(self, **kwargs):
50         # Make sure that every time the daemon starts a shutdown actor,
51         # it gets a new mock object back.
52         get_cloud_node = mock.MagicMock()
53         if "node_monitor" in kwargs:
54             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
55         mock_actor = mock.MagicMock()
56         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
57                                           cloud_node=get_cloud_node,
58                                           actor_ref=mock_actor)
59
60         mock_actor.proxy.return_value = mock_proxy
61         self.last_shutdown = mock_proxy
62
63         return mock_actor
64
65     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
66                     avail_sizes=None,
67                     min_nodes=0, max_nodes=8,
68                     shutdown_windows=[54, 5, 1],
69                     max_total_price=None):
70         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
71             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
72
73         if not avail_sizes:
74             if cloud_nodes or want_sizes:
75                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
76             else:
77                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
78
79         self.arv_factory = mock.MagicMock(name='arvados_mock')
80         api_client = mock.MagicMock(name='api_client')
81         api_client.nodes().create().execute.side_effect = \
82             [testutil.arvados_node_mock(1),
83              testutil.arvados_node_mock(2)]
84         self.arv_factory.return_value = api_client
85
86         self.cloud_factory = mock.MagicMock(name='cloud_mock')
87         self.cloud_factory().node_start_time.return_value = time.time()
88         self.cloud_updates = mock.MagicMock(name='updates_mock')
89         self.timer = testutil.MockTimer(deliver_immediately=False)
90         self.cloud_factory().node_id.side_effect = lambda node: node.id
91         self.cloud_factory().broken.return_value = False
92
93         self.node_setup = mock.MagicMock(name='setup_mock')
94         self.node_setup.start.side_effect = self.mock_node_start
95         self.node_setup.reset_mock()
96
97         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
98         self.node_shutdown.start.side_effect = self.mock_node_shutdown
99
100         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
101             self.server_wishlist_poller, self.arvados_nodes_poller,
102             self.cloud_nodes_poller, self.cloud_updates, self.timer,
103             self.arv_factory, self.cloud_factory,
104             shutdown_windows, ServerCalculator(avail_sizes),
105             min_nodes, max_nodes, 600, 1800, 3600,
106             self.node_setup, self.node_shutdown,
107             max_total_price=max_total_price).proxy()
108         if arvados_nodes is not None:
109             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
110         if cloud_nodes is not None:
111             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
112         if want_sizes is not None:
113             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
114
115     def monitor_list(self):
116         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
117
118     def monitored_arvados_nodes(self, include_unpaired=True):
119         pairings = []
120         for future in [actor.proxy().arvados_node
121                        for actor in self.monitor_list()]:
122             try:
123                 g = future.get(self.TIMEOUT)
124                 if g or include_unpaired:
125                     pairings.append(g)
126             except pykka.ActorDeadError:
127                 pass
128         return pairings
129
130     def alive_monitor_count(self):
131         return len(self.monitored_arvados_nodes())
132
133     def paired_monitor_count(self):
134         return len(self.monitored_arvados_nodes(False))
135
136     def assertShutdownCancellable(self, expected=True):
137         self.assertTrue(self.node_shutdown.start.called)
138         self.assertIs(expected,
139                       self.node_shutdown.start.call_args[1]['cancellable'],
140                       "ComputeNodeShutdownActor incorrectly cancellable")
141
142     def test_easy_node_creation(self):
143         size = testutil.MockSize(1)
144         self.make_daemon(want_sizes=[size])
145         self.busywait(lambda: self.node_setup.start.called)
146
147     def check_monitors_arvados_nodes(self, *arv_nodes):
148         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
149         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
150
151     def test_node_pairing(self):
152         cloud_node = testutil.cloud_node_mock(1)
153         arv_node = testutil.arvados_node_mock(1)
154         self.make_daemon([cloud_node], [arv_node])
155         self.check_monitors_arvados_nodes(arv_node)
156
157     def test_node_pairing_after_arvados_update(self):
158         cloud_node = testutil.cloud_node_mock(2)
159         self.make_daemon([cloud_node],
160                          [testutil.arvados_node_mock(1, ip_address=None)])
161         arv_node = testutil.arvados_node_mock(2)
162         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
163         self.check_monitors_arvados_nodes(arv_node)
164
165     def test_arvados_node_un_and_re_paired(self):
166         # We need to create the Arvados node mock after spinning up the daemon
167         # to make sure it's new enough to pair with the cloud node.
168         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
169                          arvados_nodes=None)
170         arv_node = testutil.arvados_node_mock(3)
171         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
172         self.check_monitors_arvados_nodes(arv_node)
173         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
174         self.busywait(lambda: 0 == self.alive_monitor_count())
175         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
176         self.check_monitors_arvados_nodes(arv_node)
177
178     def test_old_arvados_node_not_double_assigned(self):
179         arv_node = testutil.arvados_node_mock(3, age=9000)
180         size = testutil.MockSize(3)
181         self.make_daemon(arvados_nodes=[arv_node],
182                          avail_sizes=[(size, {"cores":1})])
183         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
184         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
185         self.stop_proxy(self.daemon)
186         used_nodes = [call[1].get('arvados_node')
187                       for call in self.node_setup.start.call_args_list]
188         self.assertEqual(2, len(used_nodes))
189         self.assertIn(arv_node, used_nodes)
190         self.assertIn(None, used_nodes)
191
192     def test_node_count_satisfied(self):
193         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
194                          want_sizes=[testutil.MockSize(1)])
195         self.busywait(lambda: not self.node_setup.start.called)
196
197     def test_select_stale_node_records_with_slot_numbers_first(self):
198         """
199         Stale node records with slot_number assigned can exist when
200         clean_arvados_node() isn't executed after a node shutdown, for
201         various reasons.
202         NodeManagerDaemonActor should use these stale node records first, so
203         that they don't accumulate unused, reducing the slots available.
204         """
205         size = testutil.MockSize(1)
206         a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
207         arvados_nodes = []
208         for n in range(9):
209             # Add several stale node records without slot_number assigned
210             arvados_nodes.append(
211                 testutil.arvados_node_mock(
212                     n+1,
213                     slot_number=None,
214                     modified_at=a_long_time_ago))
215         # Add one record with stale_node assigned, it should be the
216         # first one selected
217         arv_node = testutil.arvados_node_mock(
218             123,
219             modified_at=a_long_time_ago)
220         arvados_nodes.append(arv_node)
221         cloud_node = testutil.cloud_node_mock(125, size=size)
222         self.make_daemon(cloud_nodes=[cloud_node],
223                          arvados_nodes=arvados_nodes)
224         arvados_nodes_tracker = self.daemon.arvados_nodes.get()
225         # Here, find_stale_node() should return the node record with
226         # the slot_number assigned.
227         self.assertEqual(arv_node,
228                          arvados_nodes_tracker.find_stale_node(3601))
229
230     def test_dont_count_missing_as_busy(self):
231         size = testutil.MockSize(1)
232         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
233                                       testutil.cloud_node_mock(2, size=size)],
234                          arvados_nodes=[testutil.arvados_node_mock(1),
235                                         testutil.arvados_node_mock(
236                                             2,
237                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
238                          want_sizes=[size, size])
239         self.busywait(lambda: 2 == self.alive_monitor_count())
240         self.busywait(lambda: self.node_setup.start.called)
241
242     def test_missing_counts_towards_max(self):
243         size = testutil.MockSize(1)
244         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
245                                       testutil.cloud_node_mock(2, size=size)],
246                          arvados_nodes=[testutil.arvados_node_mock(1),
247                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
248                          want_sizes=[size, size],
249                          max_nodes=2)
250         self.busywait(lambda: not self.node_setup.start.called)
251
252     def test_excess_counts_missing(self):
253         size = testutil.MockSize(1)
254         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
255         self.make_daemon(cloud_nodes=cloud_nodes,
256                          arvados_nodes=[testutil.arvados_node_mock(1),
257                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
258                          want_sizes=[size])
259         self.busywait(lambda: 2 == self.paired_monitor_count())
260         for mon_ref in self.monitor_list():
261             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
262         self.assertEqual(1, self.node_shutdown.start.call_count)
263
264     def test_missing_shutdown_not_excess(self):
265         size = testutil.MockSize(1)
266         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
267         self.make_daemon(cloud_nodes=cloud_nodes,
268                          arvados_nodes=[testutil.arvados_node_mock(1),
269                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
270                          want_sizes=[size])
271         self.busywait(lambda: 2 == self.paired_monitor_count())
272         get_cloud_node = mock.MagicMock(name="get_cloud_node")
273         get_cloud_node.get.return_value = cloud_nodes[1]
274         mock_node_monitor = mock.MagicMock()
275         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
276         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
277
278         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
279
280         self.busywait(lambda: 2 == self.alive_monitor_count())
281         for mon_ref in self.monitor_list():
282             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
283         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
284
285     def test_booting_nodes_counted(self):
286         cloud_node = testutil.cloud_node_mock(1)
287         arv_node = testutil.arvados_node_mock(1)
288         server_wishlist = [testutil.MockSize(1)] * 2
289         self.make_daemon([cloud_node], [arv_node], server_wishlist)
290         self.daemon.max_nodes.get(self.TIMEOUT)
291         self.assertTrue(self.node_setup.start.called)
292         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
293         self.busywait(lambda: 1 == self.node_setup.start.call_count)
294
295     def test_boot_new_node_when_all_nodes_busy(self):
296         size = testutil.MockSize(2)
297         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
298         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
299                          [size], avail_sizes=[(size, {"cores":1})])
300         self.busywait(lambda: 1 == self.paired_monitor_count())
301         self.busywait(lambda: self.node_setup.start.called)
302
303     def test_boot_new_node_below_min_nodes(self):
304         min_size = testutil.MockSize(1)
305         wish_size = testutil.MockSize(3)
306         avail_sizes = [(min_size, {"cores": 1}),
307                        (wish_size, {"cores": 3})]
308         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
309         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
310         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
311         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
312         self.stop_proxy(self.daemon)
313         self.assertEqual([wish_size, min_size],
314                          [call[1].get('cloud_size')
315                           for call in self.node_setup.start.call_args_list])
316
317     def test_no_new_node_when_ge_min_nodes_busy(self):
318         size = testutil.MockSize(2)
319         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
320         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
321                      for n in range(1, 4)]
322         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
323         self.stop_proxy(self.daemon)
324         self.assertEqual(0, self.node_setup.start.call_count)
325
326     def test_no_new_node_when_max_nodes_busy(self):
327         size = testutil.MockSize(3)
328         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
329                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
330                          want_sizes=[size],
331                          max_nodes=1)
332         self.stop_proxy(self.daemon)
333         self.assertFalse(self.node_setup.start.called)
334
335     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
336         if cloud_node is None:
337             cloud_node = testutil.cloud_node_mock(id_num)
338         id_num = int(cloud_node.id)
339         if arv_node is None:
340             arv_node = testutil.arvados_node_mock(id_num)
341         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
342                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
343         self.daemon.max_nodes.get(self.TIMEOUT)
344         self.assertEqual(1, self.node_setup.start.call_count)
345         self.last_setup.cloud_node.get.return_value = cloud_node
346         self.last_setup.arvados_node.get.return_value = arv_node
347         return self.last_setup
348
349     def test_new_node_when_booted_node_not_usable(self):
350         cloud_node = testutil.cloud_node_mock(4)
351         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
352         setup = self.start_node_boot(cloud_node, arv_node)
353         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
354         self.assertEqual(1, self.alive_monitor_count())
355         self.daemon.update_arvados_nodes([arv_node])
356         self.daemon.update_cloud_nodes([cloud_node])
357         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
358         self.daemon.update_server_wishlist(
359             [testutil.MockSize(4)]).get(self.TIMEOUT)
360         self.stop_proxy(self.daemon)
361         self.assertEqual(2, self.node_setup.start.call_count)
362
363     def test_no_duplication_when_booting_node_listed_fast(self):
364         # Test that we don't start two ComputeNodeMonitorActors when
365         # we learn about a booting node through a listing before we
366         # get the "node up" message from CloudNodeSetupActor.
367         cloud_node = testutil.cloud_node_mock(1)
368         setup = self.start_node_boot(cloud_node)
369         self.daemon.update_cloud_nodes([cloud_node])
370         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
371         self.assertEqual(1, self.alive_monitor_count())
372
373     def test_no_duplication_when_booted_node_listed(self):
374         cloud_node = testutil.cloud_node_mock(2)
375         setup = self.start_node_boot(cloud_node, id_num=2)
376         self.daemon.node_setup_finished(setup)
377         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
378         self.assertEqual(1, self.alive_monitor_count())
379
380     def test_node_counted_after_boot_with_slow_listing(self):
381         # Test that, after we boot a compute node, we assume it exists
382         # even it doesn't appear in the listing (e.g., because of delays
383         # propagating tags).
384         setup = self.start_node_boot()
385         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
386         self.assertEqual(1, self.alive_monitor_count())
387         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
388         self.assertEqual(1, self.alive_monitor_count())
389
390     def test_booted_unlisted_node_counted(self):
391         setup = self.start_node_boot(id_num=1)
392         self.daemon.node_setup_finished(setup)
393         self.daemon.update_server_wishlist(
394             [testutil.MockSize(1)]).get(self.TIMEOUT)
395         self.stop_proxy(self.daemon)
396         self.assertEqual(1, self.node_setup.start.call_count)
397
398     def test_booted_node_can_shutdown(self):
399         setup = self.start_node_boot()
400         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
401         self.assertEqual(1, self.alive_monitor_count())
402         monitor = self.monitor_list()[0].proxy()
403         self.daemon.update_server_wishlist([])
404         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
405         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
406         self.stop_proxy(self.daemon)
407         self.assertTrue(self.node_shutdown.start.called,
408                         "daemon did not shut down booted node on offer")
409
410         with test_status.TestServer() as srv:
411             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
412             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
413             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
414
415     def test_booted_node_lifecycle(self):
416         cloud_node = testutil.cloud_node_mock(6)
417         setup = self.start_node_boot(cloud_node, id_num=6)
418         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
419         self.assertEqual(1, self.alive_monitor_count())
420         monitor = self.monitor_list()[0].proxy()
421         self.daemon.update_server_wishlist([])
422         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
423         self.assertShutdownCancellable(True)
424         shutdown = self.node_shutdown.start().proxy()
425         shutdown.cloud_node.get.return_value = cloud_node
426         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
427         self.daemon.update_cloud_nodes([])
428         self.assertTrue(shutdown.stop.called,
429                         "shutdown actor not stopped after finishing")
430         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
431                         "monitor for booted node not stopped after shutdown")
432         self.daemon.update_server_wishlist(
433             [testutil.MockSize(2)]).get(self.TIMEOUT)
434         self.stop_proxy(self.daemon)
435         self.assertTrue(self.node_setup.start.called,
436                         "second node not started after booted node stopped")
437
438     def test_node_disappearing_during_shutdown(self):
439         cloud_node = testutil.cloud_node_mock(6)
440         setup = self.start_node_boot(cloud_node, id_num=6)
441         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
442         self.assertEqual(1, self.alive_monitor_count())
443         monitor = self.monitor_list()[0].proxy()
444         self.daemon.update_server_wishlist([])
445         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
446         self.assertShutdownCancellable(True)
447         shutdown = self.node_shutdown.start().proxy()
448         shutdown.cloud_node.get.return_value = cloud_node
449         # Simulate a successful but slow node destroy call: the cloud node
450         # list gets updated before the ShutdownActor finishes.
451         record = self.daemon.cloud_nodes.get().nodes.values()[0]
452         self.assertTrue(record.shutdown_actor is not None)
453         self.daemon.cloud_nodes.get().nodes.clear()
454         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
455         self.assertTrue(
456             record.shutdown_actor is not None,
457             "test was ineffective -- failed to simulate the race condition")
458
459     def test_booted_node_shut_down_when_never_listed(self):
460         setup = self.start_node_boot()
461         self.cloud_factory().node_start_time.return_value = time.time() - 3601
462         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
463         self.assertEqual(1, self.alive_monitor_count())
464         self.assertFalse(self.node_shutdown.start.called)
465         now = time.time()
466         self.monitor_list()[0].tell_proxy().consider_shutdown()
467         self.busywait(lambda: self.node_shutdown.start.called)
468         self.assertShutdownCancellable(False)
469
470     def test_booted_node_shut_down_when_never_paired(self):
471         cloud_node = testutil.cloud_node_mock(2)
472         setup = self.start_node_boot(cloud_node)
473         self.cloud_factory().node_start_time.return_value = time.time() - 3601
474         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
475         self.assertEqual(1, self.alive_monitor_count())
476         self.daemon.update_cloud_nodes([cloud_node])
477         self.monitor_list()[0].tell_proxy().consider_shutdown()
478         self.busywait(lambda: self.node_shutdown.start.called)
479         self.assertShutdownCancellable(False)
480
481     def test_booted_node_shut_down_when_never_working(self):
482         cloud_node = testutil.cloud_node_mock(4)
483         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
484         setup = self.start_node_boot(cloud_node, arv_node)
485         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
486         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
487         self.assertEqual(1, self.alive_monitor_count())
488         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
489         self.daemon.update_cloud_nodes([cloud_node])
490         self.busywait(lambda: self.node_shutdown.start.called)
491         self.assertShutdownCancellable(False)
492
493     def test_node_that_pairs_not_considered_failed_boot(self):
494         cloud_node = testutil.cloud_node_mock(3)
495         arv_node = testutil.arvados_node_mock(3)
496         setup = self.start_node_boot(cloud_node, arv_node)
497         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
498         self.assertEqual(1, self.alive_monitor_count())
499         self.daemon.update_cloud_nodes([cloud_node])
500         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
501         self.timer.deliver()
502         self.stop_proxy(self.daemon)
503         self.assertFalse(self.node_shutdown.start.called)
504
505     def test_node_that_pairs_busy_not_considered_failed_boot(self):
506         cloud_node = testutil.cloud_node_mock(5)
507         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
508         setup = self.start_node_boot(cloud_node, arv_node)
509         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
510         self.assertEqual(1, self.alive_monitor_count())
511         self.daemon.update_cloud_nodes([cloud_node])
512         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
513         self.timer.deliver()
514         self.stop_proxy(self.daemon)
515         self.assertFalse(self.node_shutdown.start.called)
516
517     def test_booting_nodes_shut_down(self):
518         self.make_daemon(want_sizes=[testutil.MockSize(1)])
519         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
520         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
521
522     def test_all_booting_nodes_tried_to_shut_down(self):
523         size = testutil.MockSize(2)
524         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
525         self.daemon.max_nodes.get(self.TIMEOUT)
526         setup1 = self.last_setup
527         setup1.stop_if_no_cloud_node().get.return_value = False
528         setup1.stop_if_no_cloud_node.reset_mock()
529         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
530         self.daemon.max_nodes.get(self.TIMEOUT)
531         self.assertIsNot(setup1, self.last_setup)
532         self.last_setup.stop_if_no_cloud_node().get.return_value = True
533         self.last_setup.stop_if_no_cloud_node.reset_mock()
534         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
535         self.daemon.max_nodes.get(self.TIMEOUT)
536         self.stop_proxy(self.daemon)
537         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
538         self.assertTrue(setup1.stop_if_no_cloud_node.called)
539
540     def test_shutdown_declined_at_wishlist_capacity(self):
541         cloud_node = testutil.cloud_node_mock(1)
542         arv_node = testutil.arvados_node_mock(1)
543         size = testutil.MockSize(1)
544         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
545         self.busywait(lambda: 1 == self.paired_monitor_count())
546         monitor = self.monitor_list()[0].proxy()
547         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
548         self.stop_proxy(self.daemon)
549         self.assertFalse(self.node_shutdown.start.called)
550
551     def test_shutdown_declined_below_min_nodes(self):
552         cloud_node = testutil.cloud_node_mock(1)
553         arv_node = testutil.arvados_node_mock(1)
554         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
555         self.busywait(lambda: 1 == self.paired_monitor_count())
556         monitor = self.monitor_list()[0].proxy()
557         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
558         self.stop_proxy(self.daemon)
559         self.assertFalse(self.node_shutdown.start.called)
560
561     def test_shutdown_accepted_below_capacity(self):
562         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
563         self.busywait(lambda: 1 == self.alive_monitor_count())
564         monitor = self.monitor_list()[0].proxy()
565         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
566         self.busywait(lambda: self.node_shutdown.start.called)
567
568     def test_shutdown_declined_when_idle_and_job_queued(self):
569         size = testutil.MockSize(1)
570         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
571         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
572                      testutil.arvados_node_mock(4, job_uuid=None)]
573         self.make_daemon(cloud_nodes, arv_nodes, [size])
574         self.busywait(lambda: 2 == self.paired_monitor_count())
575         for mon_ref in self.monitor_list():
576             monitor = mon_ref.proxy()
577             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
578                 break
579         else:
580             self.fail("monitor for idle node not found")
581         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
582         self.stop_proxy(self.daemon)
583         self.assertFalse(self.node_shutdown.start.called)
584
585     def test_node_shutdown_after_cancelled_shutdown(self):
586         cloud_node = testutil.cloud_node_mock(5)
587         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
588         self.assertEqual(1, self.alive_monitor_count())
589         monitor = self.monitor_list()[0].proxy()
590         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
591         self.last_shutdown.success.get.return_value = False
592         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
593         self.busywait(lambda: 1 == self.paired_monitor_count())
594
595         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
596         self.last_shutdown.success.get.return_value = True
597         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
598         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
599         self.busywait(lambda: 0 == self.paired_monitor_count())
600
601     def test_nodes_shutting_down_replaced_below_max_nodes(self):
602         size = testutil.MockSize(6)
603         cloud_node = testutil.cloud_node_mock(6, size=size)
604         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
605                          avail_sizes=[(size, {"cores":1})])
606         self.assertEqual(1, self.alive_monitor_count())
607         monitor = self.monitor_list()[0].proxy()
608         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
609         self.assertTrue(self.node_shutdown.start.called)
610         self.daemon.update_server_wishlist(
611             [testutil.MockSize(6)]).get(self.TIMEOUT)
612         self.busywait(lambda: self.node_setup.start.called)
613
614     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
615         cloud_node = testutil.cloud_node_mock(7)
616         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
617                          max_nodes=1)
618         self.busywait(lambda: 1 == self.paired_monitor_count())
619         monitor = self.monitor_list()[0].proxy()
620         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
621         self.assertTrue(self.node_shutdown.start.called)
622         self.daemon.update_server_wishlist(
623             [testutil.MockSize(7)]).get(self.TIMEOUT)
624         self.busywait(lambda: not self.node_setup.start.called)
625
626     def test_nodes_shutting_down_count_against_excess(self):
627         size = testutil.MockSize(8)
628         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
629         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
630         self.make_daemon(cloud_nodes, arv_nodes, [size],
631                          avail_sizes=[(size, {"cores":1})])
632         self.busywait(lambda: 2 == self.paired_monitor_count())
633         for mon_ref in self.monitor_list():
634             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
635         self.assertEqual(1, self.node_shutdown.start.call_count)
636
637     def test_clean_shutdown_waits_for_node_setup_finish(self):
638         new_node = self.start_node_boot()
639         new_node.stop_if_no_cloud_node().get.return_value = False
640         new_node.stop_if_no_cloud_node.reset_mock()
641         self.daemon.shutdown().get(self.TIMEOUT)
642         self.assertTrue(new_node.stop_if_no_cloud_node.called)
643         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
644         self.assertTrue(new_node.stop.called)
645         self.timer.deliver()
646         self.assertTrue(
647             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
648
649     def test_wishlist_ignored_after_shutdown(self):
650         new_node = self.start_node_boot()
651         new_node.stop_if_no_cloud_node().get.return_value = False
652         new_node.stop_if_no_cloud_node.reset_mock()
653         self.daemon.shutdown().get(self.TIMEOUT)
654         size = testutil.MockSize(2)
655         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
656         self.timer.deliver()
657         self.busywait(lambda: 1 == self.node_setup.start.call_count)
658
659     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
660         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
661         self.assertEqual(1, self.alive_monitor_count())
662         monitor = self.monitor_list()[0].proxy()
663         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
664         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
665         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
666
667     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
668         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
669         self.assertEqual(1, self.alive_monitor_count())
670         monitor = self.monitor_list()[0].proxy()
671         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
672         # We're mainly testing that update_cloud_nodes catches and handles
673         # the ActorDeadError.
674         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
675         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
676         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
677
678     def test_node_create_two_sizes(self):
679         small = testutil.MockSize(1)
680         big = testutil.MockSize(2)
681         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
682                         (testutil.MockSize(2), {"cores":2})]
683         self.make_daemon(want_sizes=[small, small, small, big],
684                          avail_sizes=avail_sizes, max_nodes=4)
685
686         # the daemon runs in another thread, so we need to wait and see
687         # if it does all the work we're expecting it to do before stopping it.
688         self.busywait(lambda: self.node_setup.start.call_count == 4)
689         booting = self.daemon.booting.get(self.TIMEOUT)
690         self.stop_proxy(self.daemon)
691         sizecounts = {a[0].id: 0 for a in avail_sizes}
692         for b in booting.itervalues():
693             sizecounts[b.cloud_size.get().id] += 1
694         logging.info(sizecounts)
695         self.assertEqual(3, sizecounts[small.id])
696         self.assertEqual(1, sizecounts[big.id])
697
698     def test_node_max_nodes_two_sizes(self):
699         small = testutil.MockSize(1)
700         big = testutil.MockSize(2)
701         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
702                         (testutil.MockSize(2), {"cores":2})]
703         self.make_daemon(want_sizes=[small, small, small, big],
704                          avail_sizes=avail_sizes, max_nodes=3)
705
706         # the daemon runs in another thread, so we need to wait and see
707         # if it does all the work we're expecting it to do before stopping it.
708         self.busywait(lambda: self.node_setup.start.call_count == 3)
709         booting = self.daemon.booting.get(self.TIMEOUT)
710         self.stop_proxy(self.daemon)
711         sizecounts = {a[0].id: 0 for a in avail_sizes}
712         for b in booting.itervalues():
713             sizecounts[b.cloud_size.get().id] += 1
714         self.assertEqual(2, sizecounts[small.id])
715         self.assertEqual(1, sizecounts[big.id])
716
717     def test_wishlist_reconfigure(self):
718         small = testutil.MockSize(1)
719         big = testutil.MockSize(2)
720         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
721
722         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
723                                       testutil.cloud_node_mock(2, small),
724                                       testutil.cloud_node_mock(3, big)],
725                          arvados_nodes=[testutil.arvados_node_mock(1),
726                                         testutil.arvados_node_mock(2),
727                                         testutil.arvados_node_mock(3)],
728                          want_sizes=[small, small, big],
729                          avail_sizes=avail_sizes)
730         self.busywait(lambda: 3 == self.paired_monitor_count())
731         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
732
733         self.assertEqual(0, self.node_shutdown.start.call_count)
734
735         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
736             self.daemon.node_can_shutdown(c.actor)
737
738         booting = self.daemon.booting.get()
739         cloud_nodes = self.daemon.cloud_nodes.get()
740
741         self.busywait(lambda: 1 == self.node_setup.start.call_count)
742         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
743
744         self.stop_proxy(self.daemon)
745
746         # booting a new big node
747         sizecounts = {a[0].id: 0 for a in avail_sizes}
748         for b in booting.itervalues():
749             sizecounts[b.cloud_size.get().id] += 1
750         self.assertEqual(0, sizecounts[small.id])
751         self.assertEqual(1, sizecounts[big.id])
752
753         # shutting down a small node
754         sizecounts = {a[0].id: 0 for a in avail_sizes}
755         for b in cloud_nodes.nodes.itervalues():
756             if b.shutdown_actor is not None:
757                 sizecounts[b.cloud_node.size.id] += 1
758         self.assertEqual(1, sizecounts[small.id])
759         self.assertEqual(0, sizecounts[big.id])
760
761     def test_node_max_price(self):
762         small = testutil.MockSize(1)
763         big = testutil.MockSize(2)
764         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
765                         (testutil.MockSize(2), {"cores":2, "price":2})]
766         self.make_daemon(want_sizes=[small, small, small, big],
767                          avail_sizes=avail_sizes,
768                          max_nodes=4,
769                          max_total_price=4)
770         # the daemon runs in another thread, so we need to wait and see
771         # if it does all the work we're expecting it to do before stopping it.
772         self.busywait(lambda: self.node_setup.start.call_count == 3)
773         booting = self.daemon.booting.get()
774         self.stop_proxy(self.daemon)
775
776         sizecounts = {a[0].id: 0 for a in avail_sizes}
777         for b in booting.itervalues():
778             sizecounts[b.cloud_size.get().id] += 1
779         logging.info(sizecounts)
780
781         # Booting 3 small nodes and not booting a big node would also partially
782         # satisfy the wishlist and come in under the price cap, however the way
783         # the update_server_wishlist() currently works effectively results in a
784         # round-robin creation of one node of each size in the wishlist, so
785         # test for that.
786         self.assertEqual(2, sizecounts[small.id])
787         self.assertEqual(1, sizecounts[big.id])