12085: Add node_quota status & count different cloud errors separately.
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 import logging
21
22 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
23                                      unittest.TestCase):
24
25     def busywait(self, f):
26         for n in xrange(200):
27             ok = f()
28             if ok:
29                 return
30             time.sleep(.1)
31             self.daemon.ping().get(self.TIMEOUT)
32         self.assertTrue(ok) # always falsy, but not necessarily False
33
34     def mock_node_start(self, **kwargs):
35         # Make sure that every time the daemon starts a setup actor,
36         # it gets a new mock object back.
37         get_cloud_size = mock.MagicMock()
38         get_cloud_size.get.return_value = kwargs["cloud_size"]
39         mock_actor = mock.MagicMock()
40         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
41                                           cloud_size=get_cloud_size,
42                                           actor_ref=mock_actor)
43         mock_actor.proxy.return_value = mock_proxy
44         mock_actor.tell_proxy.return_value = mock_proxy
45
46         self.last_setup = mock_proxy
47         return mock_actor
48
49     def mock_node_shutdown(self, **kwargs):
50         # Make sure that every time the daemon starts a shutdown actor,
51         # it gets a new mock object back.
52         get_cloud_node = mock.MagicMock()
53         if "node_monitor" in kwargs:
54             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
55         mock_actor = mock.MagicMock()
56         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
57                                           cloud_node=get_cloud_node,
58                                           actor_ref=mock_actor)
59
60         mock_actor.proxy.return_value = mock_proxy
61         self.last_shutdown = mock_proxy
62
63         return mock_actor
64
65     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
66                     avail_sizes=None,
67                     min_nodes=0, max_nodes=8,
68                     shutdown_windows=[54, 5, 1],
69                     max_total_price=None):
70         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
71             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
72
73         if not avail_sizes:
74             if cloud_nodes or want_sizes:
75                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
76             else:
77                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
78
79         self.arv_factory = mock.MagicMock(name='arvados_mock')
80         api_client = mock.MagicMock(name='api_client')
81         api_client.nodes().create().execute.side_effect = \
82             [testutil.arvados_node_mock(1),
83              testutil.arvados_node_mock(2)]
84         self.arv_factory.return_value = api_client
85
86         self.cloud_factory = mock.MagicMock(name='cloud_mock')
87         self.cloud_factory().node_start_time.return_value = time.time()
88         self.cloud_updates = mock.MagicMock(name='updates_mock')
89         self.timer = testutil.MockTimer(deliver_immediately=False)
90         self.cloud_factory().node_id.side_effect = lambda node: node.id
91         self.cloud_factory().broken.return_value = False
92
93         self.node_setup = mock.MagicMock(name='setup_mock')
94         self.node_setup.start.side_effect = self.mock_node_start
95         self.node_setup.reset_mock()
96
97         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
98         self.node_shutdown.start.side_effect = self.mock_node_shutdown
99
100         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
101             self.server_wishlist_poller, self.arvados_nodes_poller,
102             self.cloud_nodes_poller, self.cloud_updates, self.timer,
103             self.arv_factory, self.cloud_factory,
104             shutdown_windows, ServerCalculator(avail_sizes),
105             min_nodes, max_nodes, 600, 1800, 3600,
106             self.node_setup, self.node_shutdown,
107             max_total_price=max_total_price).proxy()
108         if arvados_nodes is not None:
109             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
110         if cloud_nodes is not None:
111             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
112         if want_sizes is not None:
113             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
114
115     def monitor_list(self):
116         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
117
118     def monitored_arvados_nodes(self, include_unpaired=True):
119         pairings = []
120         for future in [actor.proxy().arvados_node
121                        for actor in self.monitor_list()]:
122             try:
123                 g = future.get(self.TIMEOUT)
124                 if g or include_unpaired:
125                     pairings.append(g)
126             except pykka.ActorDeadError:
127                 pass
128         return pairings
129
130     def alive_monitor_count(self):
131         return len(self.monitored_arvados_nodes())
132
133     def paired_monitor_count(self):
134         return len(self.monitored_arvados_nodes(False))
135
136     def assertShutdownCancellable(self, expected=True):
137         self.assertTrue(self.node_shutdown.start.called)
138         self.assertIs(expected,
139                       self.node_shutdown.start.call_args[1]['cancellable'],
140                       "ComputeNodeShutdownActor incorrectly cancellable")
141
142     def test_easy_node_creation(self):
143         size = testutil.MockSize(1)
144         self.make_daemon(want_sizes=[size])
145         self.busywait(lambda: self.node_setup.start.called)
146         self.assertIn('node_quota', status.tracker._latest)
147
148     def check_monitors_arvados_nodes(self, *arv_nodes):
149         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
150         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
151
152     def test_node_pairing(self):
153         cloud_node = testutil.cloud_node_mock(1)
154         arv_node = testutil.arvados_node_mock(1)
155         self.make_daemon([cloud_node], [arv_node])
156         self.check_monitors_arvados_nodes(arv_node)
157
158     def test_node_pairing_after_arvados_update(self):
159         cloud_node = testutil.cloud_node_mock(2)
160         self.make_daemon([cloud_node],
161                          [testutil.arvados_node_mock(1, ip_address=None)])
162         arv_node = testutil.arvados_node_mock(2)
163         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
164         self.check_monitors_arvados_nodes(arv_node)
165
166     def test_arvados_node_un_and_re_paired(self):
167         # We need to create the Arvados node mock after spinning up the daemon
168         # to make sure it's new enough to pair with the cloud node.
169         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
170                          arvados_nodes=None)
171         arv_node = testutil.arvados_node_mock(3)
172         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
173         self.check_monitors_arvados_nodes(arv_node)
174         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
175         self.busywait(lambda: 0 == self.alive_monitor_count())
176         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
177         self.check_monitors_arvados_nodes(arv_node)
178
179     def test_old_arvados_node_not_double_assigned(self):
180         arv_node = testutil.arvados_node_mock(3, age=9000)
181         size = testutil.MockSize(3)
182         self.make_daemon(arvados_nodes=[arv_node],
183                          avail_sizes=[(size, {"cores":1})])
184         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
185         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
186         self.stop_proxy(self.daemon)
187         used_nodes = [call[1].get('arvados_node')
188                       for call in self.node_setup.start.call_args_list]
189         self.assertEqual(2, len(used_nodes))
190         self.assertIn(arv_node, used_nodes)
191         self.assertIn(None, used_nodes)
192
193     def test_node_count_satisfied(self):
194         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
195                          want_sizes=[testutil.MockSize(1)])
196         self.busywait(lambda: not self.node_setup.start.called)
197
198     def test_select_stale_node_records_with_slot_numbers_first(self):
199         """
200         Stale node records with slot_number assigned can exist when
201         clean_arvados_node() isn't executed after a node shutdown, for
202         various reasons.
203         NodeManagerDaemonActor should use these stale node records first, so
204         that they don't accumulate unused, reducing the slots available.
205         """
206         size = testutil.MockSize(1)
207         a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
208         arvados_nodes = []
209         for n in range(9):
210             # Add several stale node records without slot_number assigned
211             arvados_nodes.append(
212                 testutil.arvados_node_mock(
213                     n+1,
214                     slot_number=None,
215                     modified_at=a_long_time_ago))
216         # Add one record with stale_node assigned, it should be the
217         # first one selected
218         arv_node = testutil.arvados_node_mock(
219             123,
220             modified_at=a_long_time_ago)
221         arvados_nodes.append(arv_node)
222         cloud_node = testutil.cloud_node_mock(125, size=size)
223         self.make_daemon(cloud_nodes=[cloud_node],
224                          arvados_nodes=arvados_nodes)
225         arvados_nodes_tracker = self.daemon.arvados_nodes.get()
226         # Here, find_stale_node() should return the node record with
227         # the slot_number assigned.
228         self.assertEqual(arv_node,
229                          arvados_nodes_tracker.find_stale_node(3601))
230
231     def test_dont_count_missing_as_busy(self):
232         size = testutil.MockSize(1)
233         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
234                                       testutil.cloud_node_mock(2, size=size)],
235                          arvados_nodes=[testutil.arvados_node_mock(1),
236                                         testutil.arvados_node_mock(
237                                             2,
238                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
239                          want_sizes=[size, size])
240         self.busywait(lambda: 2 == self.alive_monitor_count())
241         self.busywait(lambda: self.node_setup.start.called)
242
243     def test_missing_counts_towards_max(self):
244         size = testutil.MockSize(1)
245         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
246                                       testutil.cloud_node_mock(2, size=size)],
247                          arvados_nodes=[testutil.arvados_node_mock(1),
248                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
249                          want_sizes=[size, size],
250                          max_nodes=2)
251         self.busywait(lambda: not self.node_setup.start.called)
252
253     def test_excess_counts_missing(self):
254         size = testutil.MockSize(1)
255         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
256         self.make_daemon(cloud_nodes=cloud_nodes,
257                          arvados_nodes=[testutil.arvados_node_mock(1),
258                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
259                          want_sizes=[size])
260         self.busywait(lambda: 2 == self.paired_monitor_count())
261         for mon_ref in self.monitor_list():
262             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
263         self.assertEqual(1, self.node_shutdown.start.call_count)
264
265     def test_missing_shutdown_not_excess(self):
266         size = testutil.MockSize(1)
267         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
268         self.make_daemon(cloud_nodes=cloud_nodes,
269                          arvados_nodes=[testutil.arvados_node_mock(1),
270                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
271                          want_sizes=[size])
272         self.busywait(lambda: 2 == self.paired_monitor_count())
273         get_cloud_node = mock.MagicMock(name="get_cloud_node")
274         get_cloud_node.get.return_value = cloud_nodes[1]
275         mock_node_monitor = mock.MagicMock()
276         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
277         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
278
279         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
280
281         self.busywait(lambda: 2 == self.alive_monitor_count())
282         for mon_ref in self.monitor_list():
283             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
284         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
285
286     def test_booting_nodes_counted(self):
287         cloud_node = testutil.cloud_node_mock(1)
288         arv_node = testutil.arvados_node_mock(1)
289         server_wishlist = [testutil.MockSize(1)] * 2
290         self.make_daemon([cloud_node], [arv_node], server_wishlist)
291         self.daemon.max_nodes.get(self.TIMEOUT)
292         self.assertTrue(self.node_setup.start.called)
293         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
294         self.busywait(lambda: 1 == self.node_setup.start.call_count)
295
296     def test_boot_new_node_when_all_nodes_busy(self):
297         size = testutil.MockSize(2)
298         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
299         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
300                          [size], avail_sizes=[(size, {"cores":1})])
301         self.busywait(lambda: 1 == self.paired_monitor_count())
302         self.busywait(lambda: self.node_setup.start.called)
303
304     def test_boot_new_node_below_min_nodes(self):
305         min_size = testutil.MockSize(1)
306         wish_size = testutil.MockSize(3)
307         avail_sizes = [(min_size, {"cores": 1}),
308                        (wish_size, {"cores": 3})]
309         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
310         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
311         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
312         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
313         self.stop_proxy(self.daemon)
314         self.assertEqual([wish_size, min_size],
315                          [call[1].get('cloud_size')
316                           for call in self.node_setup.start.call_args_list])
317
318     def test_no_new_node_when_ge_min_nodes_busy(self):
319         size = testutil.MockSize(2)
320         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
321         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
322                      for n in range(1, 4)]
323         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
324         self.stop_proxy(self.daemon)
325         self.assertEqual(0, self.node_setup.start.call_count)
326
327     def test_no_new_node_when_max_nodes_busy(self):
328         size = testutil.MockSize(3)
329         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
330                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
331                          want_sizes=[size],
332                          max_nodes=1)
333         self.stop_proxy(self.daemon)
334         self.assertFalse(self.node_setup.start.called)
335
336     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
337         if cloud_node is None:
338             cloud_node = testutil.cloud_node_mock(id_num)
339         id_num = int(cloud_node.id)
340         if arv_node is None:
341             arv_node = testutil.arvados_node_mock(id_num)
342         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
343                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
344         self.daemon.max_nodes.get(self.TIMEOUT)
345         self.assertEqual(1, self.node_setup.start.call_count)
346         self.last_setup.cloud_node.get.return_value = cloud_node
347         self.last_setup.arvados_node.get.return_value = arv_node
348         return self.last_setup
349
350     def test_new_node_when_booted_node_not_usable(self):
351         cloud_node = testutil.cloud_node_mock(4)
352         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
353         setup = self.start_node_boot(cloud_node, arv_node)
354         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
355         self.assertEqual(1, self.alive_monitor_count())
356         self.daemon.update_arvados_nodes([arv_node])
357         self.daemon.update_cloud_nodes([cloud_node])
358         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
359         self.daemon.update_server_wishlist(
360             [testutil.MockSize(4)]).get(self.TIMEOUT)
361         self.stop_proxy(self.daemon)
362         self.assertEqual(2, self.node_setup.start.call_count)
363
364     def test_no_duplication_when_booting_node_listed_fast(self):
365         # Test that we don't start two ComputeNodeMonitorActors when
366         # we learn about a booting node through a listing before we
367         # get the "node up" message from CloudNodeSetupActor.
368         cloud_node = testutil.cloud_node_mock(1)
369         setup = self.start_node_boot(cloud_node)
370         self.daemon.update_cloud_nodes([cloud_node])
371         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
372         self.assertEqual(1, self.alive_monitor_count())
373
374     def test_no_duplication_when_booted_node_listed(self):
375         cloud_node = testutil.cloud_node_mock(2)
376         setup = self.start_node_boot(cloud_node, id_num=2)
377         self.daemon.node_setup_finished(setup)
378         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
379         self.assertEqual(1, self.alive_monitor_count())
380
381     def test_node_counted_after_boot_with_slow_listing(self):
382         # Test that, after we boot a compute node, we assume it exists
383         # even it doesn't appear in the listing (e.g., because of delays
384         # propagating tags).
385         setup = self.start_node_boot()
386         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
387         self.assertEqual(1, self.alive_monitor_count())
388         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
389         self.assertEqual(1, self.alive_monitor_count())
390
391     def test_booted_unlisted_node_counted(self):
392         setup = self.start_node_boot(id_num=1)
393         self.daemon.node_setup_finished(setup)
394         self.daemon.update_server_wishlist(
395             [testutil.MockSize(1)]).get(self.TIMEOUT)
396         self.stop_proxy(self.daemon)
397         self.assertEqual(1, self.node_setup.start.call_count)
398
399     def test_booted_node_can_shutdown(self):
400         setup = self.start_node_boot()
401         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
402         self.assertEqual(1, self.alive_monitor_count())
403         monitor = self.monitor_list()[0].proxy()
404         self.daemon.update_server_wishlist([])
405         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
406         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
407         self.stop_proxy(self.daemon)
408         self.assertTrue(self.node_shutdown.start.called,
409                         "daemon did not shut down booted node on offer")
410
411         with test_status.TestServer() as srv:
412             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
413             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
414             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
415
416     def test_booted_node_lifecycle(self):
417         cloud_node = testutil.cloud_node_mock(6)
418         setup = self.start_node_boot(cloud_node, id_num=6)
419         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
420         self.assertEqual(1, self.alive_monitor_count())
421         monitor = self.monitor_list()[0].proxy()
422         self.daemon.update_server_wishlist([])
423         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
424         self.assertShutdownCancellable(True)
425         shutdown = self.node_shutdown.start().proxy()
426         shutdown.cloud_node.get.return_value = cloud_node
427         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
428         self.daemon.update_cloud_nodes([])
429         self.assertTrue(shutdown.stop.called,
430                         "shutdown actor not stopped after finishing")
431         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
432                         "monitor for booted node not stopped after shutdown")
433         self.daemon.update_server_wishlist(
434             [testutil.MockSize(2)]).get(self.TIMEOUT)
435         self.stop_proxy(self.daemon)
436         self.assertTrue(self.node_setup.start.called,
437                         "second node not started after booted node stopped")
438
439     def test_node_disappearing_during_shutdown(self):
440         cloud_node = testutil.cloud_node_mock(6)
441         setup = self.start_node_boot(cloud_node, id_num=6)
442         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
443         self.assertEqual(1, self.alive_monitor_count())
444         monitor = self.monitor_list()[0].proxy()
445         self.daemon.update_server_wishlist([])
446         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
447         self.assertShutdownCancellable(True)
448         shutdown = self.node_shutdown.start().proxy()
449         shutdown.cloud_node.get.return_value = cloud_node
450         # Simulate a successful but slow node destroy call: the cloud node
451         # list gets updated before the ShutdownActor finishes.
452         record = self.daemon.cloud_nodes.get().nodes.values()[0]
453         self.assertTrue(record.shutdown_actor is not None)
454         self.daemon.cloud_nodes.get().nodes.clear()
455         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
456         self.assertTrue(
457             record.shutdown_actor is not None,
458             "test was ineffective -- failed to simulate the race condition")
459
460     def test_booted_node_shut_down_when_never_listed(self):
461         setup = self.start_node_boot()
462         self.cloud_factory().node_start_time.return_value = time.time() - 3601
463         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
464         self.assertEqual(1, self.alive_monitor_count())
465         self.assertFalse(self.node_shutdown.start.called)
466         now = time.time()
467         self.monitor_list()[0].tell_proxy().consider_shutdown()
468         self.busywait(lambda: self.node_shutdown.start.called)
469         self.assertShutdownCancellable(False)
470
471     def test_booted_node_shut_down_when_never_paired(self):
472         cloud_node = testutil.cloud_node_mock(2)
473         setup = self.start_node_boot(cloud_node)
474         self.cloud_factory().node_start_time.return_value = time.time() - 3601
475         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
476         self.assertEqual(1, self.alive_monitor_count())
477         self.daemon.update_cloud_nodes([cloud_node])
478         self.monitor_list()[0].tell_proxy().consider_shutdown()
479         self.busywait(lambda: self.node_shutdown.start.called)
480         self.assertShutdownCancellable(False)
481
482     def test_booted_node_shut_down_when_never_working(self):
483         cloud_node = testutil.cloud_node_mock(4)
484         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
485         setup = self.start_node_boot(cloud_node, arv_node)
486         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
487         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
488         self.assertEqual(1, self.alive_monitor_count())
489         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
490         self.daemon.update_cloud_nodes([cloud_node])
491         self.busywait(lambda: self.node_shutdown.start.called)
492         self.assertShutdownCancellable(False)
493
494     def test_node_that_pairs_not_considered_failed_boot(self):
495         cloud_node = testutil.cloud_node_mock(3)
496         arv_node = testutil.arvados_node_mock(3)
497         setup = self.start_node_boot(cloud_node, arv_node)
498         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
499         self.assertEqual(1, self.alive_monitor_count())
500         self.daemon.update_cloud_nodes([cloud_node])
501         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
502         self.timer.deliver()
503         self.stop_proxy(self.daemon)
504         self.assertFalse(self.node_shutdown.start.called)
505
506     def test_node_that_pairs_busy_not_considered_failed_boot(self):
507         cloud_node = testutil.cloud_node_mock(5)
508         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
509         setup = self.start_node_boot(cloud_node, arv_node)
510         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
511         self.assertEqual(1, self.alive_monitor_count())
512         self.daemon.update_cloud_nodes([cloud_node])
513         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
514         self.timer.deliver()
515         self.stop_proxy(self.daemon)
516         self.assertFalse(self.node_shutdown.start.called)
517
518     def test_booting_nodes_shut_down(self):
519         self.make_daemon(want_sizes=[testutil.MockSize(1)])
520         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
521         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
522
523     def test_all_booting_nodes_tried_to_shut_down(self):
524         size = testutil.MockSize(2)
525         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
526         self.daemon.max_nodes.get(self.TIMEOUT)
527         setup1 = self.last_setup
528         setup1.stop_if_no_cloud_node().get.return_value = False
529         setup1.stop_if_no_cloud_node.reset_mock()
530         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
531         self.daemon.max_nodes.get(self.TIMEOUT)
532         self.assertIsNot(setup1, self.last_setup)
533         self.last_setup.stop_if_no_cloud_node().get.return_value = True
534         self.last_setup.stop_if_no_cloud_node.reset_mock()
535         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
536         self.daemon.max_nodes.get(self.TIMEOUT)
537         self.stop_proxy(self.daemon)
538         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
539         self.assertTrue(setup1.stop_if_no_cloud_node.called)
540
541     def test_shutdown_declined_at_wishlist_capacity(self):
542         cloud_node = testutil.cloud_node_mock(1)
543         arv_node = testutil.arvados_node_mock(1)
544         size = testutil.MockSize(1)
545         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
546         self.busywait(lambda: 1 == self.paired_monitor_count())
547         monitor = self.monitor_list()[0].proxy()
548         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
549         self.stop_proxy(self.daemon)
550         self.assertFalse(self.node_shutdown.start.called)
551
552     def test_shutdown_declined_below_min_nodes(self):
553         cloud_node = testutil.cloud_node_mock(1)
554         arv_node = testutil.arvados_node_mock(1)
555         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
556         self.busywait(lambda: 1 == self.paired_monitor_count())
557         monitor = self.monitor_list()[0].proxy()
558         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
559         self.stop_proxy(self.daemon)
560         self.assertFalse(self.node_shutdown.start.called)
561
562     def test_shutdown_accepted_below_capacity(self):
563         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
564         self.busywait(lambda: 1 == self.alive_monitor_count())
565         monitor = self.monitor_list()[0].proxy()
566         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
567         self.busywait(lambda: self.node_shutdown.start.called)
568
569     def test_shutdown_declined_when_idle_and_job_queued(self):
570         size = testutil.MockSize(1)
571         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
572         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
573                      testutil.arvados_node_mock(4, job_uuid=None)]
574         self.make_daemon(cloud_nodes, arv_nodes, [size])
575         self.busywait(lambda: 2 == self.paired_monitor_count())
576         for mon_ref in self.monitor_list():
577             monitor = mon_ref.proxy()
578             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
579                 break
580         else:
581             self.fail("monitor for idle node not found")
582         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
583         self.stop_proxy(self.daemon)
584         self.assertFalse(self.node_shutdown.start.called)
585
586     def test_node_shutdown_after_cancelled_shutdown(self):
587         cloud_node = testutil.cloud_node_mock(5)
588         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
589         self.assertEqual(1, self.alive_monitor_count())
590         monitor = self.monitor_list()[0].proxy()
591         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
592         self.last_shutdown.success.get.return_value = False
593         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
594         self.busywait(lambda: 1 == self.paired_monitor_count())
595
596         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
597         self.last_shutdown.success.get.return_value = True
598         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
599         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
600         self.busywait(lambda: 0 == self.paired_monitor_count())
601
602     def test_nodes_shutting_down_replaced_below_max_nodes(self):
603         size = testutil.MockSize(6)
604         cloud_node = testutil.cloud_node_mock(6, size=size)
605         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
606                          avail_sizes=[(size, {"cores":1})])
607         self.assertEqual(1, self.alive_monitor_count())
608         monitor = self.monitor_list()[0].proxy()
609         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
610         self.assertTrue(self.node_shutdown.start.called)
611         self.daemon.update_server_wishlist(
612             [testutil.MockSize(6)]).get(self.TIMEOUT)
613         self.busywait(lambda: self.node_setup.start.called)
614
615     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
616         cloud_node = testutil.cloud_node_mock(7)
617         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
618                          max_nodes=1)
619         self.busywait(lambda: 1 == self.paired_monitor_count())
620         monitor = self.monitor_list()[0].proxy()
621         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
622         self.assertTrue(self.node_shutdown.start.called)
623         self.daemon.update_server_wishlist(
624             [testutil.MockSize(7)]).get(self.TIMEOUT)
625         self.busywait(lambda: not self.node_setup.start.called)
626
627     def test_nodes_shutting_down_count_against_excess(self):
628         size = testutil.MockSize(8)
629         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
630         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
631         self.make_daemon(cloud_nodes, arv_nodes, [size],
632                          avail_sizes=[(size, {"cores":1})])
633         self.busywait(lambda: 2 == self.paired_monitor_count())
634         for mon_ref in self.monitor_list():
635             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
636         self.assertEqual(1, self.node_shutdown.start.call_count)
637
638     def test_clean_shutdown_waits_for_node_setup_finish(self):
639         new_node = self.start_node_boot()
640         new_node.stop_if_no_cloud_node().get.return_value = False
641         new_node.stop_if_no_cloud_node.reset_mock()
642         self.daemon.shutdown().get(self.TIMEOUT)
643         self.assertTrue(new_node.stop_if_no_cloud_node.called)
644         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
645         self.assertTrue(new_node.stop.called)
646         self.timer.deliver()
647         self.assertTrue(
648             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
649
650     def test_wishlist_ignored_after_shutdown(self):
651         new_node = self.start_node_boot()
652         new_node.stop_if_no_cloud_node().get.return_value = False
653         new_node.stop_if_no_cloud_node.reset_mock()
654         self.daemon.shutdown().get(self.TIMEOUT)
655         size = testutil.MockSize(2)
656         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
657         self.timer.deliver()
658         self.busywait(lambda: 1 == self.node_setup.start.call_count)
659
660     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
661         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
662         self.assertEqual(1, self.alive_monitor_count())
663         monitor = self.monitor_list()[0].proxy()
664         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
665         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
666         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
667
668     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
669         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
670         self.assertEqual(1, self.alive_monitor_count())
671         monitor = self.monitor_list()[0].proxy()
672         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
673         # We're mainly testing that update_cloud_nodes catches and handles
674         # the ActorDeadError.
675         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
676         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
677         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
678
679     def test_node_create_two_sizes(self):
680         small = testutil.MockSize(1)
681         big = testutil.MockSize(2)
682         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
683                         (testutil.MockSize(2), {"cores":2})]
684         self.make_daemon(want_sizes=[small, small, small, big],
685                          avail_sizes=avail_sizes, max_nodes=4)
686
687         # the daemon runs in another thread, so we need to wait and see
688         # if it does all the work we're expecting it to do before stopping it.
689         self.busywait(lambda: self.node_setup.start.call_count == 4)
690         booting = self.daemon.booting.get(self.TIMEOUT)
691         self.stop_proxy(self.daemon)
692         sizecounts = {a[0].id: 0 for a in avail_sizes}
693         for b in booting.itervalues():
694             sizecounts[b.cloud_size.get().id] += 1
695         logging.info(sizecounts)
696         self.assertEqual(3, sizecounts[small.id])
697         self.assertEqual(1, sizecounts[big.id])
698
699     def test_node_max_nodes_two_sizes(self):
700         small = testutil.MockSize(1)
701         big = testutil.MockSize(2)
702         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
703                         (testutil.MockSize(2), {"cores":2})]
704         self.make_daemon(want_sizes=[small, small, big, small],
705                          avail_sizes=avail_sizes, max_nodes=3)
706
707         # the daemon runs in another thread, so we need to wait and see
708         # if it does all the work we're expecting it to do before stopping it.
709         self.busywait(lambda: self.node_setup.start.call_count == 3)
710         booting = self.daemon.booting.get(self.TIMEOUT)
711         self.stop_proxy(self.daemon)
712         sizecounts = {a[0].id: 0 for a in avail_sizes}
713         for b in booting.itervalues():
714             sizecounts[b.cloud_size.get().id] += 1
715         self.assertEqual(2, sizecounts[small.id])
716         self.assertEqual(1, sizecounts[big.id])
717
718     def test_wishlist_ordering(self):
719         # Check that big nodes aren't prioritized; since #12199 containers are
720         # scheduled on specific node sizes.
721         small = testutil.MockSize(1)
722         big = testutil.MockSize(2)
723         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
724                         (testutil.MockSize(2), {"cores":2})]
725         self.make_daemon(want_sizes=[small, small, small, big],
726                          avail_sizes=avail_sizes, max_nodes=3)
727
728         # the daemon runs in another thread, so we need to wait and see
729         # if it does all the work we're expecting it to do before stopping it.
730         self.busywait(lambda: self.node_setup.start.call_count == 3)
731         booting = self.daemon.booting.get(self.TIMEOUT)
732         self.stop_proxy(self.daemon)
733         sizecounts = {a[0].id: 0 for a in avail_sizes}
734         for b in booting.itervalues():
735             sizecounts[b.cloud_size.get().id] += 1
736         self.assertEqual(3, sizecounts[small.id])
737         self.assertEqual(0, sizecounts[big.id])
738
739     def test_wishlist_reconfigure(self):
740         small = testutil.MockSize(1)
741         big = testutil.MockSize(2)
742         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
743
744         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
745                                       testutil.cloud_node_mock(2, small),
746                                       testutil.cloud_node_mock(3, big)],
747                          arvados_nodes=[testutil.arvados_node_mock(1),
748                                         testutil.arvados_node_mock(2),
749                                         testutil.arvados_node_mock(3)],
750                          want_sizes=[small, small, big],
751                          avail_sizes=avail_sizes)
752         self.busywait(lambda: 3 == self.paired_monitor_count())
753         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
754
755         self.assertEqual(0, self.node_shutdown.start.call_count)
756
757         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
758             self.daemon.node_can_shutdown(c.actor)
759
760         booting = self.daemon.booting.get()
761         cloud_nodes = self.daemon.cloud_nodes.get()
762
763         self.busywait(lambda: 1 == self.node_setup.start.call_count)
764         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
765
766         self.stop_proxy(self.daemon)
767
768         # booting a new big node
769         sizecounts = {a[0].id: 0 for a in avail_sizes}
770         for b in booting.itervalues():
771             sizecounts[b.cloud_size.get().id] += 1
772         self.assertEqual(0, sizecounts[small.id])
773         self.assertEqual(1, sizecounts[big.id])
774
775         # shutting down a small node
776         sizecounts = {a[0].id: 0 for a in avail_sizes}
777         for b in cloud_nodes.nodes.itervalues():
778             if b.shutdown_actor is not None:
779                 sizecounts[b.cloud_node.size.id] += 1
780         self.assertEqual(1, sizecounts[small.id])
781         self.assertEqual(0, sizecounts[big.id])
782
783     def test_node_max_price(self):
784         small = testutil.MockSize(1)
785         big = testutil.MockSize(2)
786         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
787                         (testutil.MockSize(2), {"cores":2, "price":2})]
788         self.make_daemon(want_sizes=[small, small, small, big],
789                          avail_sizes=avail_sizes,
790                          max_nodes=4,
791                          max_total_price=4)
792         # the daemon runs in another thread, so we need to wait and see
793         # if it does all the work we're expecting it to do before stopping it.
794         self.busywait(lambda: self.node_setup.start.call_count == 3)
795         booting = self.daemon.booting.get()
796         self.stop_proxy(self.daemon)
797
798         sizecounts = {a[0].id: 0 for a in avail_sizes}
799         for b in booting.itervalues():
800             sizecounts[b.cloud_size.get().id] += 1
801         logging.info(sizecounts)
802
803         # Booting 3 small nodes and not booting a big node would also partially
804         # satisfy the wishlist and come in under the price cap, however the way
805         # the update_server_wishlist() currently works effectively results in a
806         # round-robin creation of one node of each size in the wishlist, so
807         # test for that.
808         self.assertEqual(2, sizecounts[small.id])
809         self.assertEqual(1, sizecounts[big.id])