Merge branch '13164-fix-zero-priority-after-race'
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 from . import pykka_timeout
21 import logging
22
23 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
24                                      unittest.TestCase):
25
26     def assertwait(self, f, timeout=pykka_timeout*2):
27         deadline = time.time() + timeout
28         while True:
29             try:
30                 return f()
31             except AssertionError:
32                 if time.time() > deadline:
33                     raise
34                 pass
35             time.sleep(.1)
36             self.daemon.ping().get(self.TIMEOUT)
37
38     def busywait(self, f):
39         for n in xrange(200):
40             ok = f()
41             if ok:
42                 return
43             time.sleep(.1)
44             self.daemon.ping().get(self.TIMEOUT)
45         self.assertTrue(ok) # always falsy, but not necessarily False
46
47     def mock_node_start(self, **kwargs):
48         # Make sure that every time the daemon starts a setup actor,
49         # it gets a new mock object back.
50         get_cloud_size = mock.MagicMock()
51         get_cloud_size.get.return_value = kwargs["cloud_size"]
52         mock_actor = mock.MagicMock()
53         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
54                                           cloud_size=get_cloud_size,
55                                           actor_ref=mock_actor)
56         mock_actor.proxy.return_value = mock_proxy
57         mock_actor.tell_proxy.return_value = mock_proxy
58
59         self.last_setup = mock_proxy
60         return mock_actor
61
62     def mock_node_shutdown(self, **kwargs):
63         # Make sure that every time the daemon starts a shutdown actor,
64         # it gets a new mock object back.
65         get_cloud_node = mock.MagicMock()
66         if "node_monitor" in kwargs:
67             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
68         mock_actor = mock.MagicMock()
69         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
70                                           cloud_node=get_cloud_node,
71                                           actor_ref=mock_actor)
72
73         mock_actor.proxy.return_value = mock_proxy
74         self.last_shutdown = mock_proxy
75
76         return mock_actor
77
78     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
79                     avail_sizes=None,
80                     min_nodes=0, max_nodes=8,
81                     shutdown_windows=[54, 5, 1],
82                     max_total_price=None):
83         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
84             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
85
86         if not avail_sizes:
87             if cloud_nodes or want_sizes:
88                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
89             else:
90                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
91
92         self.arv_factory = mock.MagicMock(name='arvados_mock')
93         api_client = mock.MagicMock(name='api_client')
94         api_client.nodes().create().execute.side_effect = \
95             [testutil.arvados_node_mock(1),
96              testutil.arvados_node_mock(2)]
97         self.arv_factory.return_value = api_client
98
99         self.cloud_factory = mock.MagicMock(name='cloud_mock')
100         self.cloud_factory().node_start_time.return_value = time.time()
101         self.cloud_updates = mock.MagicMock(name='updates_mock')
102         self.timer = testutil.MockTimer(deliver_immediately=False)
103         self.cloud_factory().node_id.side_effect = lambda node: node.id
104         self.cloud_factory().broken.return_value = False
105
106         self.node_setup = mock.MagicMock(name='setup_mock')
107         self.node_setup.start.side_effect = self.mock_node_start
108         self.node_setup.reset_mock()
109
110         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
111         self.node_shutdown.start.side_effect = self.mock_node_shutdown
112
113         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
114             self.server_wishlist_poller, self.arvados_nodes_poller,
115             self.cloud_nodes_poller, self.cloud_updates, self.timer,
116             self.arv_factory, self.cloud_factory,
117             shutdown_windows, ServerCalculator(avail_sizes),
118             min_nodes, max_nodes, 600, 1800, 3600,
119             self.node_setup, self.node_shutdown,
120             max_total_price=max_total_price).proxy()
121         if arvados_nodes is not None:
122             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
123         if cloud_nodes is not None:
124             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
125         if want_sizes is not None:
126             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
127
128     def monitor_list(self):
129         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
130
131     def monitored_arvados_nodes(self, include_unpaired=True):
132         pairings = []
133         for future in [actor.proxy().arvados_node
134                        for actor in self.monitor_list()]:
135             try:
136                 g = future.get(self.TIMEOUT)
137                 if g or include_unpaired:
138                     pairings.append(g)
139             except pykka.ActorDeadError:
140                 pass
141         return pairings
142
143     def alive_monitor_count(self):
144         return len(self.monitored_arvados_nodes())
145
146     def paired_monitor_count(self):
147         return len(self.monitored_arvados_nodes(False))
148
149     def assertShutdownCancellable(self, expected=True):
150         self.assertTrue(self.node_shutdown.start.called)
151         self.assertIs(expected,
152                       self.node_shutdown.start.call_args[1]['cancellable'],
153                       "ComputeNodeShutdownActor incorrectly cancellable")
154
155     def test_easy_node_creation(self):
156         size = testutil.MockSize(1)
157         self.make_daemon(want_sizes=[size])
158         self.busywait(lambda: self.node_setup.start.called)
159         self.assertIn('node_quota', status.tracker._latest)
160
161     def check_monitors_arvados_nodes(self, *arv_nodes):
162         self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
163
164     def test_node_pairing(self):
165         cloud_node = testutil.cloud_node_mock(1)
166         arv_node = testutil.arvados_node_mock(1)
167         self.make_daemon([cloud_node], [arv_node])
168         self.check_monitors_arvados_nodes(arv_node)
169
170     def test_node_pairing_after_arvados_update(self):
171         cloud_node = testutil.cloud_node_mock(2)
172         self.make_daemon([cloud_node],
173                          [testutil.arvados_node_mock(1, ip_address=None)])
174         arv_node = testutil.arvados_node_mock(2)
175         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
176         self.check_monitors_arvados_nodes(arv_node)
177
178     def test_arvados_node_un_and_re_paired(self):
179         # We need to create the Arvados node mock after spinning up the daemon
180         # to make sure it's new enough to pair with the cloud node.
181         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
182                          arvados_nodes=None)
183         arv_node = testutil.arvados_node_mock(3)
184         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
185         self.check_monitors_arvados_nodes(arv_node)
186         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
187         self.busywait(lambda: 0 == self.alive_monitor_count())
188         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
189         self.check_monitors_arvados_nodes(arv_node)
190
191     def test_old_arvados_node_not_double_assigned(self):
192         arv_node = testutil.arvados_node_mock(3, age=9000)
193         size = testutil.MockSize(3)
194         self.make_daemon(arvados_nodes=[arv_node],
195                          avail_sizes=[(size, {"cores":1})])
196         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
197         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
198         self.stop_proxy(self.daemon)
199         used_nodes = [call[1].get('arvados_node')
200                       for call in self.node_setup.start.call_args_list]
201         self.assertEqual(2, len(used_nodes))
202         self.assertIn(arv_node, used_nodes)
203         self.assertIn(None, used_nodes)
204
205     def test_node_count_satisfied(self):
206         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
207                          want_sizes=[testutil.MockSize(1)])
208         self.busywait(lambda: not self.node_setup.start.called)
209
210     def test_select_stale_node_records_with_slot_numbers_first(self):
211         """
212         Stale node records with slot_number assigned can exist when
213         clean_arvados_node() isn't executed after a node shutdown, for
214         various reasons.
215         NodeManagerDaemonActor should use these stale node records first, so
216         that they don't accumulate unused, reducing the slots available.
217         """
218         size = testutil.MockSize(1)
219         a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
220         arvados_nodes = []
221         for n in range(9):
222             # Add several stale node records without slot_number assigned
223             arvados_nodes.append(
224                 testutil.arvados_node_mock(
225                     n+1,
226                     slot_number=None,
227                     modified_at=a_long_time_ago))
228         # Add one record with stale_node assigned, it should be the
229         # first one selected
230         arv_node = testutil.arvados_node_mock(
231             123,
232             modified_at=a_long_time_ago)
233         arvados_nodes.append(arv_node)
234         cloud_node = testutil.cloud_node_mock(125, size=size)
235         self.make_daemon(cloud_nodes=[cloud_node],
236                          arvados_nodes=arvados_nodes)
237         arvados_nodes_tracker = self.daemon.arvados_nodes.get()
238         # Here, find_stale_node() should return the node record with
239         # the slot_number assigned.
240         self.assertEqual(arv_node,
241                          arvados_nodes_tracker.find_stale_node(3601))
242
243     def test_dont_count_missing_as_busy(self):
244         size = testutil.MockSize(1)
245         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
246                                       testutil.cloud_node_mock(2, size=size)],
247                          arvados_nodes=[testutil.arvados_node_mock(1),
248                                         testutil.arvados_node_mock(
249                                             2,
250                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
251                          want_sizes=[size, size])
252         self.busywait(lambda: 2 == self.alive_monitor_count())
253         self.busywait(lambda: self.node_setup.start.called)
254
255     def test_missing_counts_towards_max(self):
256         size = testutil.MockSize(1)
257         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
258                                       testutil.cloud_node_mock(2, size=size)],
259                          arvados_nodes=[testutil.arvados_node_mock(1),
260                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
261                          want_sizes=[size, size],
262                          max_nodes=2)
263         self.busywait(lambda: not self.node_setup.start.called)
264
265     def test_excess_counts_missing(self):
266         size = testutil.MockSize(1)
267         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
268         self.make_daemon(cloud_nodes=cloud_nodes,
269                          arvados_nodes=[testutil.arvados_node_mock(1),
270                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
271                          want_sizes=[size])
272         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
273         for mon_ref in self.monitor_list():
274             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
275         self.assertEqual(1, self.node_shutdown.start.call_count)
276
277     def test_missing_shutdown_not_excess(self):
278         size = testutil.MockSize(1)
279         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
280         self.make_daemon(cloud_nodes=cloud_nodes,
281                          arvados_nodes=[testutil.arvados_node_mock(1),
282                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
283                          want_sizes=[size])
284         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
285         get_cloud_node = mock.MagicMock(name="get_cloud_node")
286         get_cloud_node.get.return_value = cloud_nodes[1]
287         mock_node_monitor = mock.MagicMock()
288         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
289         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
290
291         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
292
293         self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
294         for mon_ref in self.monitor_list():
295             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
296         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
297
298     def test_booting_nodes_counted(self):
299         cloud_node = testutil.cloud_node_mock(1)
300         arv_node = testutil.arvados_node_mock(1)
301         server_wishlist = [testutil.MockSize(1)] * 2
302         self.make_daemon([cloud_node], [arv_node], server_wishlist)
303         self.daemon.max_nodes.get(self.TIMEOUT)
304         self.assertTrue(self.node_setup.start.called)
305         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
306         self.busywait(lambda: 1 == self.node_setup.start.call_count)
307
308     def test_boot_new_node_when_all_nodes_busy(self):
309         size = testutil.MockSize(2)
310         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
311         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
312                          [size], avail_sizes=[(size, {"cores":1})])
313         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
314         self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
315
316     def test_boot_new_node_below_min_nodes(self):
317         min_size = testutil.MockSize(1)
318         wish_size = testutil.MockSize(3)
319         avail_sizes = [(min_size, {"cores": 1}),
320                        (wish_size, {"cores": 3})]
321         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
322         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
323         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
324         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
325         self.stop_proxy(self.daemon)
326         self.assertEqual([wish_size, min_size],
327                          [call[1].get('cloud_size')
328                           for call in self.node_setup.start.call_args_list])
329
330     def test_no_new_node_when_ge_min_nodes_busy(self):
331         size = testutil.MockSize(2)
332         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
333         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
334                      for n in range(1, 4)]
335         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
336         self.stop_proxy(self.daemon)
337         self.assertEqual(0, self.node_setup.start.call_count)
338
339     def test_no_new_node_when_max_nodes_busy(self):
340         size = testutil.MockSize(3)
341         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
342                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
343                          want_sizes=[size],
344                          max_nodes=1)
345         self.stop_proxy(self.daemon)
346         self.assertFalse(self.node_setup.start.called)
347
348     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
349         if cloud_node is None:
350             cloud_node = testutil.cloud_node_mock(id_num)
351         id_num = int(cloud_node.id)
352         if arv_node is None:
353             arv_node = testutil.arvados_node_mock(id_num)
354         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
355                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
356         self.daemon.max_nodes.get(self.TIMEOUT)
357         self.assertEqual(1, self.node_setup.start.call_count)
358         self.last_setup.cloud_node.get.return_value = cloud_node
359         self.last_setup.arvados_node.get.return_value = arv_node
360         return self.last_setup
361
362     def test_new_node_when_booted_node_not_usable(self):
363         cloud_node = testutil.cloud_node_mock(4)
364         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
365         setup = self.start_node_boot(cloud_node, arv_node)
366         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
367         self.assertEqual(1, self.alive_monitor_count())
368         self.daemon.update_arvados_nodes([arv_node])
369         self.daemon.update_cloud_nodes([cloud_node])
370         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
371         self.daemon.update_server_wishlist(
372             [testutil.MockSize(4)]).get(self.TIMEOUT)
373         self.stop_proxy(self.daemon)
374         self.assertEqual(2, self.node_setup.start.call_count)
375
376     def test_no_duplication_when_booting_node_listed_fast(self):
377         # Test that we don't start two ComputeNodeMonitorActors when
378         # we learn about a booting node through a listing before we
379         # get the "node up" message from CloudNodeSetupActor.
380         cloud_node = testutil.cloud_node_mock(1)
381         setup = self.start_node_boot(cloud_node)
382         self.daemon.update_cloud_nodes([cloud_node])
383         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
384         self.assertEqual(1, self.alive_monitor_count())
385
386     def test_no_duplication_when_booted_node_listed(self):
387         cloud_node = testutil.cloud_node_mock(2)
388         setup = self.start_node_boot(cloud_node, id_num=2)
389         self.daemon.node_setup_finished(setup)
390         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
391         self.assertEqual(1, self.alive_monitor_count())
392
393     def test_node_counted_after_boot_with_slow_listing(self):
394         # Test that, after we boot a compute node, we assume it exists
395         # even it doesn't appear in the listing (e.g., because of delays
396         # propagating tags).
397         setup = self.start_node_boot()
398         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
399         self.assertEqual(1, self.alive_monitor_count())
400         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
401         self.assertEqual(1, self.alive_monitor_count())
402
403     def test_booted_unlisted_node_counted(self):
404         setup = self.start_node_boot(id_num=1)
405         self.daemon.node_setup_finished(setup)
406         self.daemon.update_server_wishlist(
407             [testutil.MockSize(1)]).get(self.TIMEOUT)
408         self.stop_proxy(self.daemon)
409         self.assertEqual(1, self.node_setup.start.call_count)
410
411     def test_booted_node_can_shutdown(self):
412         setup = self.start_node_boot()
413         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
414         self.assertEqual(1, self.alive_monitor_count())
415         monitor = self.monitor_list()[0].proxy()
416         self.daemon.update_server_wishlist([])
417         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
418         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
419         self.stop_proxy(self.daemon)
420         self.assertTrue(self.node_shutdown.start.called,
421                         "daemon did not shut down booted node on offer")
422
423         with test_status.TestServer() as srv:
424             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
425             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
426             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
427
428     def test_booted_node_lifecycle(self):
429         cloud_node = testutil.cloud_node_mock(6)
430         setup = self.start_node_boot(cloud_node, id_num=6)
431         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
432         self.assertEqual(1, self.alive_monitor_count())
433         monitor = self.monitor_list()[0].proxy()
434         self.daemon.update_server_wishlist([])
435         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
436         self.assertShutdownCancellable(True)
437         shutdown = self.node_shutdown.start().proxy()
438         shutdown.cloud_node.get.return_value = cloud_node
439         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
440         self.daemon.update_cloud_nodes([])
441         self.assertTrue(shutdown.stop.called,
442                         "shutdown actor not stopped after finishing")
443         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
444                         "monitor for booted node not stopped after shutdown")
445         self.daemon.update_server_wishlist(
446             [testutil.MockSize(2)]).get(self.TIMEOUT)
447         self.stop_proxy(self.daemon)
448         self.assertTrue(self.node_setup.start.called,
449                         "second node not started after booted node stopped")
450
451     def test_node_disappearing_during_shutdown(self):
452         cloud_node = testutil.cloud_node_mock(6)
453         setup = self.start_node_boot(cloud_node, id_num=6)
454         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
455         self.assertEqual(1, self.alive_monitor_count())
456         monitor = self.monitor_list()[0].proxy()
457         self.daemon.update_server_wishlist([])
458         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
459         self.assertShutdownCancellable(True)
460         shutdown = self.node_shutdown.start().proxy()
461         shutdown.cloud_node.get.return_value = cloud_node
462         # Simulate a successful but slow node destroy call: the cloud node
463         # list gets updated before the ShutdownActor finishes.
464         record = self.daemon.cloud_nodes.get().nodes.values()[0]
465         self.assertTrue(record.shutdown_actor is not None)
466         self.daemon.cloud_nodes.get().nodes.clear()
467         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
468         self.assertTrue(
469             record.shutdown_actor is not None,
470             "test was ineffective -- failed to simulate the race condition")
471
472     def test_booted_node_shut_down_when_never_listed(self):
473         setup = self.start_node_boot()
474         self.cloud_factory().node_start_time.return_value = time.time() - 3601
475         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
476         self.assertEqual(1, self.alive_monitor_count())
477         self.assertFalse(self.node_shutdown.start.called)
478         now = time.time()
479         self.monitor_list()[0].tell_proxy().consider_shutdown()
480         self.busywait(lambda: self.node_shutdown.start.called)
481         self.assertShutdownCancellable(False)
482
483     def test_booted_node_shut_down_when_never_paired(self):
484         cloud_node = testutil.cloud_node_mock(2)
485         setup = self.start_node_boot(cloud_node)
486         self.cloud_factory().node_start_time.return_value = time.time() - 3601
487         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
488         self.assertEqual(1, self.alive_monitor_count())
489         self.daemon.update_cloud_nodes([cloud_node])
490         self.monitor_list()[0].tell_proxy().consider_shutdown()
491         self.busywait(lambda: self.node_shutdown.start.called)
492         self.assertShutdownCancellable(False)
493
494     def test_booted_node_shut_down_when_never_working(self):
495         cloud_node = testutil.cloud_node_mock(4)
496         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
497         setup = self.start_node_boot(cloud_node, arv_node)
498         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
499         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
500         self.assertEqual(1, self.alive_monitor_count())
501         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
502         self.daemon.update_cloud_nodes([cloud_node])
503         self.busywait(lambda: self.node_shutdown.start.called)
504         self.assertShutdownCancellable(False)
505
506     def test_node_that_pairs_not_considered_failed_boot(self):
507         cloud_node = testutil.cloud_node_mock(3)
508         arv_node = testutil.arvados_node_mock(3)
509         setup = self.start_node_boot(cloud_node, arv_node)
510         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
511         self.assertEqual(1, self.alive_monitor_count())
512         self.daemon.update_cloud_nodes([cloud_node])
513         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
514         self.timer.deliver()
515         self.stop_proxy(self.daemon)
516         self.assertFalse(self.node_shutdown.start.called)
517
518     def test_node_that_pairs_busy_not_considered_failed_boot(self):
519         cloud_node = testutil.cloud_node_mock(5)
520         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
521         setup = self.start_node_boot(cloud_node, arv_node)
522         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
523         self.assertEqual(1, self.alive_monitor_count())
524         self.daemon.update_cloud_nodes([cloud_node])
525         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
526         self.timer.deliver()
527         self.stop_proxy(self.daemon)
528         self.assertFalse(self.node_shutdown.start.called)
529
530     def test_booting_nodes_shut_down(self):
531         self.make_daemon(want_sizes=[testutil.MockSize(1)])
532         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
533         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
534
535     def test_all_booting_nodes_tried_to_shut_down(self):
536         size = testutil.MockSize(2)
537         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
538         self.daemon.max_nodes.get(self.TIMEOUT)
539         setup1 = self.last_setup
540         setup1.stop_if_no_cloud_node().get.return_value = False
541         setup1.stop_if_no_cloud_node.reset_mock()
542         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
543         self.daemon.max_nodes.get(self.TIMEOUT)
544         self.assertIsNot(setup1, self.last_setup)
545         self.last_setup.stop_if_no_cloud_node().get.return_value = True
546         self.last_setup.stop_if_no_cloud_node.reset_mock()
547         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
548         self.daemon.max_nodes.get(self.TIMEOUT)
549         self.stop_proxy(self.daemon)
550         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
551         self.assertTrue(setup1.stop_if_no_cloud_node.called)
552
553     def test_shutdown_declined_at_wishlist_capacity(self):
554         cloud_node = testutil.cloud_node_mock(1)
555         arv_node = testutil.arvados_node_mock(1)
556         size = testutil.MockSize(1)
557         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
558         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
559         monitor = self.monitor_list()[0].proxy()
560         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
561         self.stop_proxy(self.daemon)
562         self.assertFalse(self.node_shutdown.start.called)
563
564     def test_shutdown_declined_below_min_nodes(self):
565         cloud_node = testutil.cloud_node_mock(1)
566         arv_node = testutil.arvados_node_mock(1)
567         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
568         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
569         monitor = self.monitor_list()[0].proxy()
570         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
571         self.stop_proxy(self.daemon)
572         self.assertFalse(self.node_shutdown.start.called)
573
574     def test_shutdown_accepted_below_capacity(self):
575         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
576         self.busywait(lambda: 1 == self.alive_monitor_count())
577         monitor = self.monitor_list()[0].proxy()
578         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
579         self.busywait(lambda: self.node_shutdown.start.called)
580
581     def test_shutdown_declined_when_idle_and_job_queued(self):
582         size = testutil.MockSize(1)
583         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
584         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
585                      testutil.arvados_node_mock(4, job_uuid=None)]
586         self.make_daemon(cloud_nodes, arv_nodes, [size])
587         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
588         for mon_ref in self.monitor_list():
589             monitor = mon_ref.proxy()
590             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
591                 break
592         else:
593             self.fail("monitor for idle node not found")
594         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
595         self.stop_proxy(self.daemon)
596         self.assertFalse(self.node_shutdown.start.called)
597
598     def test_node_shutdown_after_cancelled_shutdown(self):
599         cloud_node = testutil.cloud_node_mock(5)
600         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
601         self.assertEqual(1, self.alive_monitor_count())
602         monitor = self.monitor_list()[0].proxy()
603         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
604         self.last_shutdown.success.get.return_value = False
605         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
606         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
607
608         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
609         self.last_shutdown.success.get.return_value = True
610         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
611         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
612         self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
613
614     def test_nodes_shutting_down_replaced_below_max_nodes(self):
615         size = testutil.MockSize(6)
616         cloud_node = testutil.cloud_node_mock(6, size=size)
617         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
618                          avail_sizes=[(size, {"cores":1})])
619         self.assertEqual(1, self.alive_monitor_count())
620         monitor = self.monitor_list()[0].proxy()
621         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
622         self.assertTrue(self.node_shutdown.start.called)
623         self.daemon.update_server_wishlist(
624             [testutil.MockSize(6)]).get(self.TIMEOUT)
625         self.busywait(lambda: self.node_setup.start.called)
626
627     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
628         cloud_node = testutil.cloud_node_mock(7)
629         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
630                          max_nodes=1)
631         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
632         monitor = self.monitor_list()[0].proxy()
633         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
634         self.assertTrue(self.node_shutdown.start.called)
635         self.daemon.update_server_wishlist(
636             [testutil.MockSize(7)]).get(self.TIMEOUT)
637         self.busywait(lambda: not self.node_setup.start.called)
638
639     def test_nodes_shutting_down_count_against_excess(self):
640         size = testutil.MockSize(8)
641         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
642         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
643         self.make_daemon(cloud_nodes, arv_nodes, [size],
644                          avail_sizes=[(size, {"cores":1})])
645         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
646         for mon_ref in self.monitor_list():
647             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
648         self.assertEqual(1, self.node_shutdown.start.call_count)
649
650     def test_clean_shutdown_waits_for_node_setup_finish(self):
651         new_node = self.start_node_boot()
652         new_node.stop_if_no_cloud_node().get.return_value = False
653         new_node.stop_if_no_cloud_node.reset_mock()
654         self.daemon.shutdown().get(self.TIMEOUT)
655         self.assertTrue(new_node.stop_if_no_cloud_node.called)
656         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
657         self.assertTrue(new_node.stop.called)
658         self.timer.deliver()
659         self.assertTrue(
660             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
661
662     def test_wishlist_ignored_after_shutdown(self):
663         new_node = self.start_node_boot()
664         new_node.stop_if_no_cloud_node().get.return_value = False
665         new_node.stop_if_no_cloud_node.reset_mock()
666         self.daemon.shutdown().get(self.TIMEOUT)
667         size = testutil.MockSize(2)
668         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
669         self.timer.deliver()
670         self.busywait(lambda: 1 == self.node_setup.start.call_count)
671
672     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
673         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
674         self.assertEqual(1, self.alive_monitor_count())
675         monitor = self.monitor_list()[0].proxy()
676         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
677         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
678         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
679
680     def test_idle_node_disappearing_clears_status_idle_time_counter(self):
681         size = testutil.MockSize(1)
682         status.tracker._idle_nodes = {}
683         cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
684         arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
685         self.make_daemon(cloud_nodes, arv_nodes, [size])
686         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
687         for mon_ref in self.monitor_list():
688             monitor = mon_ref.proxy()
689             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
690                 break
691         else:
692             self.fail("monitor for idle node not found")
693         self.assertEqual(1, status.tracker.get('nodes_idle'))
694         hostname = monitor.arvados_node.get()['hostname']
695         self.assertIn(hostname, status.tracker._idle_nodes)
696         # Simulate the node disappearing from the cloud node list
697         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
698         self.busywait(lambda: 0 == self.alive_monitor_count())
699         self.assertNotIn(hostname, status.tracker._idle_nodes)
700
701     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
702         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
703         self.assertEqual(1, self.alive_monitor_count())
704         monitor = self.monitor_list()[0].proxy()
705         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
706         # We're mainly testing that update_cloud_nodes catches and handles
707         # the ActorDeadError.
708         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
709         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
710         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
711
712     def test_node_create_two_sizes(self):
713         small = testutil.MockSize(1)
714         big = testutil.MockSize(2)
715         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
716                         (testutil.MockSize(2), {"cores":2})]
717         self.make_daemon(want_sizes=[small, small, small, big],
718                          avail_sizes=avail_sizes, max_nodes=4)
719
720         # the daemon runs in another thread, so we need to wait and see
721         # if it does all the work we're expecting it to do before stopping it.
722         self.busywait(lambda: self.node_setup.start.call_count == 4)
723         booting = self.daemon.booting.get(self.TIMEOUT)
724         self.stop_proxy(self.daemon)
725         sizecounts = {a[0].id: 0 for a in avail_sizes}
726         for b in booting.itervalues():
727             sizecounts[b.cloud_size.get().id] += 1
728         logging.info(sizecounts)
729         self.assertEqual(3, sizecounts[small.id])
730         self.assertEqual(1, sizecounts[big.id])
731
732     def test_node_max_nodes_two_sizes(self):
733         small = testutil.MockSize(1)
734         big = testutil.MockSize(2)
735         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
736                         (testutil.MockSize(2), {"cores":2})]
737         self.make_daemon(want_sizes=[small, small, big, small],
738                          avail_sizes=avail_sizes, max_nodes=3)
739
740         # the daemon runs in another thread, so we need to wait and see
741         # if it does all the work we're expecting it to do before stopping it.
742         self.busywait(lambda: self.node_setup.start.call_count == 3)
743         booting = self.daemon.booting.get(self.TIMEOUT)
744         self.stop_proxy(self.daemon)
745         sizecounts = {a[0].id: 0 for a in avail_sizes}
746         for b in booting.itervalues():
747             sizecounts[b.cloud_size.get().id] += 1
748         self.assertEqual(2, sizecounts[small.id])
749         self.assertEqual(1, sizecounts[big.id])
750
751     def test_wishlist_ordering(self):
752         # Check that big nodes aren't prioritized; since #12199 containers are
753         # scheduled on specific node sizes.
754         small = testutil.MockSize(1)
755         big = testutil.MockSize(2)
756         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
757                         (testutil.MockSize(2), {"cores":2})]
758         self.make_daemon(want_sizes=[small, small, small, big],
759                          avail_sizes=avail_sizes, max_nodes=3)
760
761         # the daemon runs in another thread, so we need to wait and see
762         # if it does all the work we're expecting it to do before stopping it.
763         self.busywait(lambda: self.node_setup.start.call_count == 3)
764         booting = self.daemon.booting.get(self.TIMEOUT)
765         self.stop_proxy(self.daemon)
766         sizecounts = {a[0].id: 0 for a in avail_sizes}
767         for b in booting.itervalues():
768             sizecounts[b.cloud_size.get().id] += 1
769         self.assertEqual(3, sizecounts[small.id])
770         self.assertEqual(0, sizecounts[big.id])
771
772     def test_wishlist_reconfigure(self):
773         small = testutil.MockSize(1)
774         big = testutil.MockSize(2)
775         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
776
777         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
778                                       testutil.cloud_node_mock(2, small),
779                                       testutil.cloud_node_mock(3, big)],
780                          arvados_nodes=[testutil.arvados_node_mock(1),
781                                         testutil.arvados_node_mock(2),
782                                         testutil.arvados_node_mock(3)],
783                          want_sizes=[small, small, big],
784                          avail_sizes=avail_sizes)
785         self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
786         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
787
788         self.assertEqual(0, self.node_shutdown.start.call_count)
789
790         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
791             self.daemon.node_can_shutdown(c.actor)
792
793         booting = self.daemon.booting.get()
794         cloud_nodes = self.daemon.cloud_nodes.get()
795
796         self.busywait(lambda: 1 == self.node_setup.start.call_count)
797         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
798
799         self.stop_proxy(self.daemon)
800
801         # booting a new big node
802         sizecounts = {a[0].id: 0 for a in avail_sizes}
803         for b in booting.itervalues():
804             sizecounts[b.cloud_size.get().id] += 1
805         self.assertEqual(0, sizecounts[small.id])
806         self.assertEqual(1, sizecounts[big.id])
807
808         # shutting down a small node
809         sizecounts = {a[0].id: 0 for a in avail_sizes}
810         for b in cloud_nodes.nodes.itervalues():
811             if b.shutdown_actor is not None:
812                 sizecounts[b.cloud_node.size.id] += 1
813         self.assertEqual(1, sizecounts[small.id])
814         self.assertEqual(0, sizecounts[big.id])
815
816     def test_node_max_price(self):
817         small = testutil.MockSize(1)
818         big = testutil.MockSize(2)
819         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
820                         (testutil.MockSize(2), {"cores":2, "price":2})]
821         self.make_daemon(want_sizes=[small, small, small, big],
822                          avail_sizes=avail_sizes,
823                          max_nodes=4,
824                          max_total_price=4)
825         # the daemon runs in another thread, so we need to wait and see
826         # if it does all the work we're expecting it to do before stopping it.
827         self.busywait(lambda: self.node_setup.start.call_count == 3)
828         booting = self.daemon.booting.get()
829         self.stop_proxy(self.daemon)
830
831         sizecounts = {a[0].id: 0 for a in avail_sizes}
832         for b in booting.itervalues():
833             sizecounts[b.cloud_size.get().id] += 1
834         logging.info(sizecounts)
835
836         # Booting 3 small nodes and not booting a big node would also partially
837         # satisfy the wishlist and come in under the price cap, however the way
838         # the update_server_wishlist() currently works effectively results in a
839         # round-robin creation of one node of each size in the wishlist, so
840         # test for that.
841         self.assertEqual(2, sizecounts[small.id])
842         self.assertEqual(1, sizecounts[big.id])