15964: Remove qr1hi from a few more places. Delete unused includes.
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import time
9 import unittest
10
11 import mock
12 import pykka
13
14 import arvnodeman.daemon as nmdaemon
15 import arvnodeman.status as status
16 from arvnodeman.jobqueue import ServerCalculator
17 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
18 from . import testutil
19 from . import test_status
20 from . import pykka_timeout
21 import logging
22
23 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
24                                      unittest.TestCase):
25
26     def assertwait(self, f, timeout=pykka_timeout*2):
27         deadline = time.time() + timeout
28         while True:
29             try:
30                 return f()
31             except AssertionError:
32                 if time.time() > deadline:
33                     raise
34                 pass
35             time.sleep(.1)
36             self.daemon.ping().get(self.TIMEOUT)
37
38     def busywait(self, f):
39         for n in xrange(200):
40             ok = f()
41             if ok:
42                 return
43             time.sleep(.1)
44             self.daemon.ping().get(self.TIMEOUT)
45         self.assertTrue(ok) # always falsy, but not necessarily False
46
47     def mock_node_start(self, **kwargs):
48         # Make sure that every time the daemon starts a setup actor,
49         # it gets a new mock object back.
50         get_cloud_size = mock.MagicMock()
51         get_cloud_size.get.return_value = kwargs["cloud_size"]
52         mock_actor = mock.MagicMock()
53         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
54                                           cloud_size=get_cloud_size,
55                                           actor_ref=mock_actor)
56         mock_actor.proxy.return_value = mock_proxy
57         mock_actor.tell_proxy.return_value = mock_proxy
58
59         self.last_setup = mock_proxy
60         return mock_actor
61
62     def mock_node_shutdown(self, **kwargs):
63         # Make sure that every time the daemon starts a shutdown actor,
64         # it gets a new mock object back.
65         get_cloud_node = mock.MagicMock()
66         if "node_monitor" in kwargs:
67             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
68         mock_actor = mock.MagicMock()
69         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
70                                           cloud_node=get_cloud_node,
71                                           actor_ref=mock_actor)
72
73         mock_actor.proxy.return_value = mock_proxy
74         self.last_shutdown = mock_proxy
75
76         return mock_actor
77
78     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
79                     avail_sizes=None,
80                     min_nodes=0, max_nodes=8,
81                     shutdown_windows=[54, 5, 1],
82                     max_total_price=None):
83         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
84             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
85
86         if not avail_sizes:
87             if cloud_nodes or want_sizes:
88                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
89             else:
90                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
91
92         self.arv_factory = mock.MagicMock(name='arvados_mock')
93         api_client = mock.MagicMock(name='api_client')
94         api_client.nodes().create().execute.side_effect = \
95             [testutil.arvados_node_mock(1),
96              testutil.arvados_node_mock(2)]
97         self.arv_factory.return_value = api_client
98
99         self.cloud_factory = mock.MagicMock(name='cloud_mock')
100         self.cloud_factory().node_start_time.return_value = time.time()
101         self.cloud_updates = mock.MagicMock(name='updates_mock')
102         self.timer = testutil.MockTimer(deliver_immediately=False)
103         self.cloud_factory().node_id.side_effect = lambda node: node.id
104         self.cloud_factory().broken.return_value = False
105
106         self.node_setup = mock.MagicMock(name='setup_mock')
107         self.node_setup.start.side_effect = self.mock_node_start
108         self.node_setup.reset_mock()
109
110         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
111         self.node_shutdown.start.side_effect = self.mock_node_shutdown
112
113         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
114             self.server_wishlist_poller, self.arvados_nodes_poller,
115             self.cloud_nodes_poller, self.cloud_updates, self.timer,
116             self.arv_factory, self.cloud_factory,
117             shutdown_windows, ServerCalculator(avail_sizes),
118             min_nodes, max_nodes, 600, 1800, 3600,
119             self.node_setup, self.node_shutdown,
120             max_total_price=max_total_price).proxy()
121         if arvados_nodes is not None:
122             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
123         if cloud_nodes is not None:
124             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
125         if want_sizes is not None:
126             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
127
128     def monitor_list(self):
129         return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
130
131     def monitored_arvados_nodes(self, include_unpaired=True):
132         pairings = []
133         for future in [actor.proxy().arvados_node
134                        for actor in self.monitor_list()]:
135             try:
136                 g = future.get(self.TIMEOUT)
137                 if g or include_unpaired:
138                     pairings.append(g)
139             except pykka.ActorDeadError:
140                 pass
141         return pairings
142
143     def alive_monitor_count(self):
144         return len(self.monitored_arvados_nodes())
145
146     def paired_monitor_count(self):
147         return len(self.monitored_arvados_nodes(False))
148
149     def assertShutdownCancellable(self, expected=True):
150         self.assertTrue(self.node_shutdown.start.called)
151         self.assertIs(expected,
152                       self.node_shutdown.start.call_args[1]['cancellable'],
153                       "ComputeNodeShutdownActor incorrectly cancellable")
154
155     def test_easy_node_creation(self):
156         size = testutil.MockSize(1)
157         self.make_daemon(want_sizes=[size])
158         self.busywait(lambda: self.node_setup.start.called)
159         self.assertIn('node_quota', status.tracker._latest)
160
161     def check_monitors_arvados_nodes(self, *arv_nodes):
162         self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
163
164     def test_node_pairing(self):
165         cloud_node = testutil.cloud_node_mock(1)
166         arv_node = testutil.arvados_node_mock(1)
167         self.make_daemon([cloud_node], [arv_node])
168         self.check_monitors_arvados_nodes(arv_node)
169
170     def test_node_pairing_after_arvados_update(self):
171         cloud_node = testutil.cloud_node_mock(2)
172         self.make_daemon([cloud_node],
173                          [testutil.arvados_node_mock(1, ip_address=None)])
174         arv_node = testutil.arvados_node_mock(2)
175         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
176         self.check_monitors_arvados_nodes(arv_node)
177
178     def test_arvados_node_un_and_re_paired(self):
179         # We need to create the Arvados node mock after spinning up the daemon
180         # to make sure it's new enough to pair with the cloud node.
181         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
182                          arvados_nodes=None)
183         arv_node = testutil.arvados_node_mock(3)
184         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
185         self.check_monitors_arvados_nodes(arv_node)
186         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
187         self.busywait(lambda: 0 == self.alive_monitor_count())
188         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
189         self.check_monitors_arvados_nodes(arv_node)
190
191     def test_old_arvados_node_not_double_assigned(self):
192         arv_node = testutil.arvados_node_mock(3, age=9000)
193         size = testutil.MockSize(3)
194         self.make_daemon(arvados_nodes=[arv_node],
195                          avail_sizes=[(size, {"cores":1})])
196         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
197         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
198         self.stop_proxy(self.daemon)
199         used_nodes = [call[1].get('arvados_node')
200                       for call in self.node_setup.start.call_args_list]
201         self.assertEqual(2, len(used_nodes))
202         self.assertIn(arv_node, used_nodes)
203         self.assertIn(None, used_nodes)
204
205     def test_node_count_satisfied(self):
206         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
207                          want_sizes=[testutil.MockSize(1)])
208         self.busywait(lambda: not self.node_setup.start.called)
209
210     def test_select_stale_node_records_with_slot_numbers_first(self):
211         """
212         Stale node records with slot_number assigned can exist when
213         clean_arvados_node() isn't executed after a node shutdown, for
214         various reasons.
215         NodeManagerDaemonActor should use these stale node records first, so
216         that they don't accumulate unused, reducing the slots available.
217         """
218         size = testutil.MockSize(1)
219         a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
220         arvados_nodes = []
221         for n in range(9):
222             # Add several stale node records without slot_number assigned
223             arvados_nodes.append(
224                 testutil.arvados_node_mock(
225                     n+1,
226                     slot_number=None,
227                     modified_at=a_long_time_ago))
228         # Add one record with stale_node assigned, it should be the
229         # first one selected
230         arv_node = testutil.arvados_node_mock(
231             123,
232             modified_at=a_long_time_ago)
233         arvados_nodes.append(arv_node)
234         cloud_node = testutil.cloud_node_mock(125, size=size)
235         self.make_daemon(cloud_nodes=[cloud_node],
236                          arvados_nodes=arvados_nodes)
237         arvados_nodes_tracker = self.daemon.arvados_nodes.get()
238         # Here, find_stale_node() should return the node record with
239         # the slot_number assigned.
240         self.assertEqual(arv_node,
241                          arvados_nodes_tracker.find_stale_node(3601))
242
243     def test_dont_count_missing_as_busy(self):
244         size = testutil.MockSize(1)
245         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
246                                       testutil.cloud_node_mock(2, size=size)],
247                          arvados_nodes=[testutil.arvados_node_mock(1),
248                                         testutil.arvados_node_mock(
249                                             2,
250                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
251                          want_sizes=[size, size])
252         self.busywait(lambda: 2 == self.alive_monitor_count())
253         self.busywait(lambda: self.node_setup.start.called)
254
255     def test_missing_counts_towards_max(self):
256         size = testutil.MockSize(1)
257         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
258                                       testutil.cloud_node_mock(2, size=size)],
259                          arvados_nodes=[testutil.arvados_node_mock(1),
260                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
261                          want_sizes=[size, size],
262                          max_nodes=2)
263         self.busywait(lambda: not self.node_setup.start.called)
264
265     def test_excess_counts_missing(self):
266         size = testutil.MockSize(1)
267         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
268         self.make_daemon(cloud_nodes=cloud_nodes,
269                          arvados_nodes=[testutil.arvados_node_mock(1),
270                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
271                          want_sizes=[size])
272         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
273         for mon_ref in self.monitor_list():
274             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
275         self.assertEqual(1, self.node_shutdown.start.call_count)
276
277     def test_missing_shutdown_not_excess(self):
278         size = testutil.MockSize(1)
279         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
280         self.make_daemon(cloud_nodes=cloud_nodes,
281                          arvados_nodes=[testutil.arvados_node_mock(1),
282                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
283                          want_sizes=[size])
284         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
285         get_cloud_node = mock.MagicMock(name="get_cloud_node")
286         get_cloud_node.get.return_value = cloud_nodes[1]
287         mock_node_monitor = mock.MagicMock()
288         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
289         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
290
291         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
292
293         self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
294         for mon_ref in self.monitor_list():
295             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
296         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
297
298     def test_booting_nodes_counted(self):
299         cloud_node = testutil.cloud_node_mock(1)
300         arv_node = testutil.arvados_node_mock(1)
301         server_wishlist = [testutil.MockSize(1)] * 2
302         self.make_daemon([cloud_node], [arv_node], server_wishlist)
303         self.daemon.max_nodes.get(self.TIMEOUT)
304         self.assertTrue(self.node_setup.start.called)
305         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
306         self.busywait(lambda: 1 == self.node_setup.start.call_count)
307
308     def test_boot_new_node_when_all_nodes_busy(self):
309         size = testutil.MockSize(2)
310         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
311         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
312                          [size], avail_sizes=[(size, {"cores":1})])
313         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
314         self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
315
316     def test_boot_new_node_below_min_nodes(self):
317         min_size = testutil.MockSize(1)
318         wish_size = testutil.MockSize(3)
319         avail_sizes = [(min_size, {"cores": 1}),
320                        (wish_size, {"cores": 3})]
321         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
322         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
323         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
324         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
325         self.stop_proxy(self.daemon)
326         self.assertEqual([wish_size, min_size],
327                          [call[1].get('cloud_size')
328                           for call in self.node_setup.start.call_args_list])
329
330     def test_no_new_node_when_ge_min_nodes_busy(self):
331         size = testutil.MockSize(2)
332         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
333         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
334                      for n in range(1, 4)]
335         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
336         self.stop_proxy(self.daemon)
337         self.assertEqual(0, self.node_setup.start.call_count)
338
339     def test_no_new_node_when_max_nodes_busy(self):
340         size = testutil.MockSize(3)
341         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
342                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
343                          want_sizes=[size],
344                          max_nodes=1)
345         self.stop_proxy(self.daemon)
346         self.assertFalse(self.node_setup.start.called)
347
348     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
349         if cloud_node is None:
350             cloud_node = testutil.cloud_node_mock(id_num)
351         id_num = int(cloud_node.id)
352         if arv_node is None:
353             arv_node = testutil.arvados_node_mock(id_num)
354         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
355                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
356         self.daemon.max_nodes.get(self.TIMEOUT)
357         self.assertEqual(1, self.node_setup.start.call_count)
358         self.last_setup.cloud_node.get.return_value = cloud_node
359         self.last_setup.arvados_node.get.return_value = arv_node
360         return self.last_setup
361
362     def test_new_node_when_booted_node_not_usable(self):
363         cloud_node = testutil.cloud_node_mock(4)
364         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
365         setup = self.start_node_boot(cloud_node, arv_node)
366         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
367         self.assertEqual(1, self.alive_monitor_count())
368         self.daemon.update_arvados_nodes([arv_node])
369         self.daemon.update_cloud_nodes([cloud_node])
370         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
371         self.daemon.update_server_wishlist(
372             [testutil.MockSize(4)]).get(self.TIMEOUT)
373         self.stop_proxy(self.daemon)
374         self.assertEqual(2, self.node_setup.start.call_count)
375
376     def test_no_duplication_when_booting_node_listed_fast(self):
377         # Test that we don't start two ComputeNodeMonitorActors when
378         # we learn about a booting node through a listing before we
379         # get the "node up" message from CloudNodeSetupActor.
380         cloud_node = testutil.cloud_node_mock(1)
381         setup = self.start_node_boot(cloud_node)
382         self.daemon.update_cloud_nodes([cloud_node])
383         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
384         self.assertEqual(1, self.alive_monitor_count())
385
386     def test_no_duplication_when_booted_node_listed(self):
387         cloud_node = testutil.cloud_node_mock(2)
388         setup = self.start_node_boot(cloud_node, id_num=2)
389         self.daemon.node_setup_finished(setup)
390         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
391         self.assertEqual(1, self.alive_monitor_count())
392
393     def test_node_counted_after_boot_with_slow_listing(self):
394         # Test that, after we boot a compute node, we assume it exists
395         # even it doesn't appear in the listing (e.g., because of delays
396         # propagating tags).
397         setup = self.start_node_boot()
398         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
399         self.assertEqual(1, self.alive_monitor_count())
400         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
401         self.assertEqual(1, self.alive_monitor_count())
402
403     def test_booted_unlisted_node_counted(self):
404         setup = self.start_node_boot(id_num=1)
405         self.daemon.node_setup_finished(setup)
406         self.daemon.update_server_wishlist(
407             [testutil.MockSize(1)]).get(self.TIMEOUT)
408         self.stop_proxy(self.daemon)
409         self.assertEqual(1, self.node_setup.start.call_count)
410
411     def test_booted_node_can_shutdown(self):
412         setup = self.start_node_boot()
413         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
414         self.assertEqual(1, self.alive_monitor_count())
415         monitor = self.monitor_list()[0].proxy()
416         self.daemon.update_server_wishlist([])
417         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
418         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
419         self.stop_proxy(self.daemon)
420         self.assertTrue(self.node_shutdown.start.called,
421                         "daemon did not shut down booted node on offer")
422
423         with test_status.TestServer() as srv:
424             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
425             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
426             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
427
428     def test_booted_node_lifecycle(self):
429         cloud_node = testutil.cloud_node_mock(6)
430         setup = self.start_node_boot(cloud_node, id_num=6)
431         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
432         self.assertEqual(1, self.alive_monitor_count())
433         monitor = self.monitor_list()[0].proxy()
434         self.daemon.update_server_wishlist([])
435         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
436         self.assertShutdownCancellable(True)
437         shutdown = self.node_shutdown.start().proxy()
438         shutdown.cloud_node.get.return_value = cloud_node
439         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
440         self.daemon.update_cloud_nodes([])
441         self.assertTrue(shutdown.stop.called,
442                         "shutdown actor not stopped after finishing")
443         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
444                         "monitor for booted node not stopped after shutdown")
445         self.daemon.update_server_wishlist(
446             [testutil.MockSize(2)]).get(self.TIMEOUT)
447         self.stop_proxy(self.daemon)
448         self.assertTrue(self.node_setup.start.called,
449                         "second node not started after booted node stopped")
450
451     def test_node_disappearing_during_shutdown(self):
452         cloud_node = testutil.cloud_node_mock(6)
453         setup = self.start_node_boot(cloud_node, id_num=6)
454         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
455         self.assertEqual(1, self.alive_monitor_count())
456         monitor = self.monitor_list()[0].proxy()
457         self.daemon.update_server_wishlist([])
458         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
459         self.assertShutdownCancellable(True)
460         shutdown = self.node_shutdown.start().proxy()
461         shutdown.cloud_node.get.return_value = cloud_node
462         # Simulate a successful but slow node destroy call: the cloud node
463         # list gets updated before the ShutdownActor finishes.
464         record = self.daemon.cloud_nodes.get().nodes.values()[0]
465         self.assertTrue(record.shutdown_actor is not None)
466         self.daemon.cloud_nodes.get().nodes.clear()
467         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
468         self.assertTrue(
469             record.shutdown_actor is not None,
470             "test was ineffective -- failed to simulate the race condition")
471
472     def test_booted_node_shut_down_when_never_listed(self):
473         setup = self.start_node_boot()
474         self.cloud_factory().node_start_time.return_value = time.time() - 3601
475         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
476         self.assertEqual(1, self.alive_monitor_count())
477         self.assertFalse(self.node_shutdown.start.called)
478         now = time.time()
479         self.monitor_list()[0].tell_proxy().consider_shutdown()
480         self.busywait(lambda: self.node_shutdown.start.called)
481         self.assertShutdownCancellable(False)
482
483     def test_booted_node_shut_down_when_never_paired(self):
484         cloud_node = testutil.cloud_node_mock(2)
485         setup = self.start_node_boot(cloud_node)
486         self.cloud_factory().node_start_time.return_value = time.time() - 3601
487         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
488         self.assertEqual(1, self.alive_monitor_count())
489         self.daemon.update_cloud_nodes([cloud_node])
490         self.monitor_list()[0].tell_proxy().consider_shutdown()
491         self.busywait(lambda: self.node_shutdown.start.called)
492         self.assertShutdownCancellable(False)
493
494     def test_booted_node_shut_down_when_never_working(self):
495         cloud_node = testutil.cloud_node_mock(4)
496         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
497         setup = self.start_node_boot(cloud_node, arv_node)
498         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
499         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
500         self.assertEqual(1, self.alive_monitor_count())
501         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
502         self.daemon.update_cloud_nodes([cloud_node])
503         self.busywait(lambda: self.node_shutdown.start.called)
504         self.assertShutdownCancellable(False)
505
506     def test_node_that_pairs_not_considered_failed_boot(self):
507         cloud_node = testutil.cloud_node_mock(3)
508         arv_node = testutil.arvados_node_mock(3)
509         setup = self.start_node_boot(cloud_node, arv_node)
510         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
511         self.assertEqual(1, self.alive_monitor_count())
512         self.daemon.update_cloud_nodes([cloud_node])
513         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
514         self.timer.deliver()
515         self.stop_proxy(self.daemon)
516         self.assertFalse(self.node_shutdown.start.called)
517
518     def test_node_that_pairs_busy_not_considered_failed_boot(self):
519         cloud_node = testutil.cloud_node_mock(5)
520         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
521         setup = self.start_node_boot(cloud_node, arv_node)
522         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
523         self.assertEqual(1, self.alive_monitor_count())
524         self.daemon.update_cloud_nodes([cloud_node])
525         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
526         self.timer.deliver()
527         self.stop_proxy(self.daemon)
528         self.assertFalse(self.node_shutdown.start.called)
529
530     def test_booting_nodes_shut_down(self):
531         self.make_daemon(want_sizes=[testutil.MockSize(1)])
532         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
533         self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
534
535     def test_all_booting_nodes_tried_to_shut_down(self):
536         size = testutil.MockSize(2)
537         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
538         self.daemon.max_nodes.get(self.TIMEOUT)
539         setup1 = self.last_setup
540         setup1.stop_if_no_cloud_node().get.return_value = False
541         setup1.stop_if_no_cloud_node.reset_mock()
542         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
543         self.daemon.max_nodes.get(self.TIMEOUT)
544         self.assertIsNot(setup1, self.last_setup)
545         self.last_setup.stop_if_no_cloud_node().get.return_value = True
546         self.last_setup.stop_if_no_cloud_node.reset_mock()
547         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
548         self.daemon.max_nodes.get(self.TIMEOUT)
549         self.stop_proxy(self.daemon)
550         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
551         self.assertTrue(setup1.stop_if_no_cloud_node.called)
552
553     def test_shutdown_declined_at_wishlist_capacity(self):
554         cloud_node = testutil.cloud_node_mock(1)
555         arv_node = testutil.arvados_node_mock(1)
556         size = testutil.MockSize(1)
557         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
558         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
559         monitor = self.monitor_list()[0].proxy()
560         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
561         self.stop_proxy(self.daemon)
562         self.assertFalse(self.node_shutdown.start.called)
563
564     def test_shutdown_declined_below_min_nodes(self):
565         cloud_node = testutil.cloud_node_mock(1)
566         arv_node = testutil.arvados_node_mock(1)
567         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
568         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
569         monitor = self.monitor_list()[0].proxy()
570         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
571         self.stop_proxy(self.daemon)
572         self.assertFalse(self.node_shutdown.start.called)
573
574     def test_shutdown_accepted_below_capacity(self):
575         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
576         self.busywait(lambda: 1 == self.alive_monitor_count())
577         monitor = self.monitor_list()[0].proxy()
578         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
579         self.busywait(lambda: self.node_shutdown.start.called)
580
581     def test_shutdown_declined_when_idle_and_job_queued(self):
582         size = testutil.MockSize(1)
583         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
584         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
585                      testutil.arvados_node_mock(4, job_uuid=None)]
586         self.make_daemon(cloud_nodes, arv_nodes, [size])
587         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
588         for mon_ref in self.monitor_list():
589             monitor = mon_ref.proxy()
590             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
591                 break
592         else:
593             self.fail("monitor for idle node not found")
594         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
595         self.stop_proxy(self.daemon)
596         self.assertFalse(self.node_shutdown.start.called)
597
598     def test_node_shutdown_after_cancelled_shutdown(self):
599         cloud_node = testutil.cloud_node_mock(5)
600         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
601         self.assertEqual(1, self.alive_monitor_count())
602         monitor = self.monitor_list()[0].proxy()
603         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
604         self.last_shutdown.success.get.return_value = False
605         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
606         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
607
608         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
609         self.last_shutdown.success.get.return_value = True
610         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
611         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
612         self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
613
614     def test_nodes_shutting_down_replaced_below_max_nodes(self):
615         size = testutil.MockSize(6)
616         cloud_node = testutil.cloud_node_mock(6, size=size)
617         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
618                          avail_sizes=[(size, {"cores":1})])
619         self.assertEqual(1, self.alive_monitor_count())
620         monitor = self.monitor_list()[0].proxy()
621         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
622         self.assertTrue(self.node_shutdown.start.called)
623         getmock = mock.MagicMock()
624         getmock.get.return_value = False
625         self.last_shutdown.cancel_shutdown.return_value = getmock
626         self.daemon.update_server_wishlist(
627             [testutil.MockSize(6)]).get(self.TIMEOUT)
628         self.busywait(lambda: self.node_setup.start.called)
629
630     def test_nodes_shutting_down_cancelled(self):
631         size = testutil.MockSize(6)
632         cloud_node = testutil.cloud_node_mock(6, size=size)
633         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
634                          avail_sizes=[(size, {"cores":1})])
635         self.assertEqual(1, self.alive_monitor_count())
636         monitor = self.monitor_list()[0].proxy()
637         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
638         self.assertTrue(self.node_shutdown.start.called)
639         self.daemon.update_server_wishlist(
640             [testutil.MockSize(6)]).get(self.TIMEOUT)
641         self.busywait(lambda: self.last_shutdown.cancel_shutdown.called)
642
643     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
644         cloud_node = testutil.cloud_node_mock(7)
645         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
646                          max_nodes=1)
647         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
648         monitor = self.monitor_list()[0].proxy()
649         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
650         self.assertTrue(self.node_shutdown.start.called)
651         self.daemon.update_server_wishlist(
652             [testutil.MockSize(7)]).get(self.TIMEOUT)
653         self.busywait(lambda: not self.node_setup.start.called)
654
655     def test_nodes_shutting_down_count_against_excess(self):
656         size = testutil.MockSize(8)
657         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
658         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
659         self.make_daemon(cloud_nodes, arv_nodes, [size],
660                          avail_sizes=[(size, {"cores":1})])
661         self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
662         for mon_ref in self.monitor_list():
663             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
664         self.assertEqual(1, self.node_shutdown.start.call_count)
665
666     def test_clean_shutdown_waits_for_node_setup_finish(self):
667         new_node = self.start_node_boot()
668         new_node.stop_if_no_cloud_node().get.return_value = False
669         new_node.stop_if_no_cloud_node.reset_mock()
670         self.daemon.shutdown().get(self.TIMEOUT)
671         self.assertTrue(new_node.stop_if_no_cloud_node.called)
672         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
673         self.assertTrue(new_node.stop.called)
674         self.timer.deliver()
675         self.assertTrue(
676             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
677
678     def test_wishlist_ignored_after_shutdown(self):
679         new_node = self.start_node_boot()
680         new_node.stop_if_no_cloud_node().get.return_value = False
681         new_node.stop_if_no_cloud_node.reset_mock()
682         self.daemon.shutdown().get(self.TIMEOUT)
683         size = testutil.MockSize(2)
684         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
685         self.timer.deliver()
686         self.busywait(lambda: 1 == self.node_setup.start.call_count)
687
688     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
689         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
690         self.assertEqual(1, self.alive_monitor_count())
691         monitor = self.monitor_list()[0].proxy()
692         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
693         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
694         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
695
696     def test_idle_node_disappearing_clears_status_idle_time_counter(self):
697         size = testutil.MockSize(1)
698         status.tracker._idle_nodes = {}
699         cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
700         arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
701         self.make_daemon(cloud_nodes, arv_nodes, [size])
702         self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
703         for mon_ref in self.monitor_list():
704             monitor = mon_ref.proxy()
705             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
706                 break
707         else:
708             self.fail("monitor for idle node not found")
709         self.assertEqual(1, status.tracker.get('nodes_idle'))
710         hostname = monitor.arvados_node.get()['hostname']
711         self.assertIn(hostname, status.tracker._idle_nodes)
712         # Simulate the node disappearing from the cloud node list
713         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
714         self.busywait(lambda: 0 == self.alive_monitor_count())
715         self.assertNotIn(hostname, status.tracker._idle_nodes)
716
717     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
718         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
719         self.assertEqual(1, self.alive_monitor_count())
720         monitor = self.monitor_list()[0].proxy()
721         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
722         # We're mainly testing that update_cloud_nodes catches and handles
723         # the ActorDeadError.
724         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
725         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
726         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
727
728     def test_node_create_two_sizes(self):
729         small = testutil.MockSize(1)
730         big = testutil.MockSize(2)
731         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
732                         (testutil.MockSize(2), {"cores":2})]
733         self.make_daemon(want_sizes=[small, small, small, big],
734                          avail_sizes=avail_sizes, max_nodes=4)
735
736         # the daemon runs in another thread, so we need to wait and see
737         # if it does all the work we're expecting it to do before stopping it.
738         self.busywait(lambda: self.node_setup.start.call_count == 4)
739         booting = self.daemon.booting.get(self.TIMEOUT)
740         self.stop_proxy(self.daemon)
741         sizecounts = {a[0].id: 0 for a in avail_sizes}
742         for b in booting.itervalues():
743             sizecounts[b.cloud_size.get().id] += 1
744         logging.info(sizecounts)
745         self.assertEqual(3, sizecounts[small.id])
746         self.assertEqual(1, sizecounts[big.id])
747
748     def test_node_max_nodes_two_sizes(self):
749         small = testutil.MockSize(1)
750         big = testutil.MockSize(2)
751         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
752                         (testutil.MockSize(2), {"cores":2})]
753         self.make_daemon(want_sizes=[small, small, big, small],
754                          avail_sizes=avail_sizes, max_nodes=3)
755
756         # the daemon runs in another thread, so we need to wait and see
757         # if it does all the work we're expecting it to do before stopping it.
758         self.busywait(lambda: self.node_setup.start.call_count == 3)
759         booting = self.daemon.booting.get(self.TIMEOUT)
760         self.stop_proxy(self.daemon)
761         sizecounts = {a[0].id: 0 for a in avail_sizes}
762         for b in booting.itervalues():
763             sizecounts[b.cloud_size.get().id] += 1
764         self.assertEqual(2, sizecounts[small.id])
765         self.assertEqual(1, sizecounts[big.id])
766
767     def test_wishlist_ordering(self):
768         # Check that big nodes aren't prioritized; since #12199 containers are
769         # scheduled on specific node sizes.
770         small = testutil.MockSize(1)
771         big = testutil.MockSize(2)
772         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
773                         (testutil.MockSize(2), {"cores":2})]
774         self.make_daemon(want_sizes=[small, small, small, big],
775                          avail_sizes=avail_sizes, max_nodes=3)
776
777         # the daemon runs in another thread, so we need to wait and see
778         # if it does all the work we're expecting it to do before stopping it.
779         self.busywait(lambda: self.node_setup.start.call_count == 3)
780         booting = self.daemon.booting.get(self.TIMEOUT)
781         self.stop_proxy(self.daemon)
782         sizecounts = {a[0].id: 0 for a in avail_sizes}
783         for b in booting.itervalues():
784             sizecounts[b.cloud_size.get().id] += 1
785         self.assertEqual(3, sizecounts[small.id])
786         self.assertEqual(0, sizecounts[big.id])
787
788     def test_wishlist_reconfigure(self):
789         small = testutil.MockSize(1)
790         big = testutil.MockSize(2)
791         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
792
793         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
794                                       testutil.cloud_node_mock(2, small),
795                                       testutil.cloud_node_mock(3, big)],
796                          arvados_nodes=[testutil.arvados_node_mock(1),
797                                         testutil.arvados_node_mock(2),
798                                         testutil.arvados_node_mock(3)],
799                          want_sizes=[small, small, big],
800                          avail_sizes=avail_sizes)
801         self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
802         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
803
804         self.assertEqual(0, self.node_shutdown.start.call_count)
805
806         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
807             self.daemon.node_can_shutdown(c.actor)
808
809         booting = self.daemon.booting.get()
810         cloud_nodes = self.daemon.cloud_nodes.get()
811
812         self.busywait(lambda: 1 == self.node_setup.start.call_count)
813         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
814
815         self.stop_proxy(self.daemon)
816
817         # booting a new big node
818         sizecounts = {a[0].id: 0 for a in avail_sizes}
819         for b in booting.itervalues():
820             sizecounts[b.cloud_size.get().id] += 1
821         self.assertEqual(0, sizecounts[small.id])
822         self.assertEqual(1, sizecounts[big.id])
823
824         # shutting down a small node
825         sizecounts = {a[0].id: 0 for a in avail_sizes}
826         for b in cloud_nodes.nodes.itervalues():
827             if b.shutdown_actor is not None:
828                 sizecounts[b.cloud_node.size.id] += 1
829         self.assertEqual(1, sizecounts[small.id])
830         self.assertEqual(0, sizecounts[big.id])
831
832     def test_node_max_price(self):
833         small = testutil.MockSize(1)
834         big = testutil.MockSize(2)
835         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
836                         (testutil.MockSize(2), {"cores":2, "price":2})]
837         self.make_daemon(want_sizes=[small, small, small, big],
838                          avail_sizes=avail_sizes,
839                          max_nodes=4,
840                          max_total_price=4)
841         # the daemon runs in another thread, so we need to wait and see
842         # if it does all the work we're expecting it to do before stopping it.
843         self.busywait(lambda: self.node_setup.start.call_count == 3)
844         booting = self.daemon.booting.get()
845         self.stop_proxy(self.daemon)
846
847         sizecounts = {a[0].id: 0 for a in avail_sizes}
848         for b in booting.itervalues():
849             sizecounts[b.cloud_size.get().id] += 1
850         logging.info(sizecounts)
851
852         # Booting 3 small nodes and not booting a big node would also partially
853         # satisfy the wishlist and come in under the price cap, however the way
854         # the update_server_wishlist() currently works effectively results in a
855         # round-robin creation of one node of each size in the wishlist, so
856         # test for that.
857         self.assertEqual(2, sizecounts[small.id])
858         self.assertEqual(1, sizecounts[big.id])