10847: Daemon shutdown now stops most actors, only waits for setup actors.
[arvados.git] / services / nodemanager / tests / test_daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import time
6 import unittest
7
8 import mock
9 import pykka
10
11 import arvnodeman.daemon as nmdaemon
12 import arvnodeman.status as status
13 from arvnodeman.jobqueue import ServerCalculator
14 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
15 from . import testutil
16 from . import test_status
17 import logging
18
19 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
20                                      unittest.TestCase):
21     def mock_node_start(self, **kwargs):
22         # Make sure that every time the daemon starts a setup actor,
23         # it gets a new mock object back.
24         get_cloud_size = mock.MagicMock()
25         get_cloud_size.get.return_value = kwargs["cloud_size"]
26         mock_actor = mock.MagicMock()
27         mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
28                                           cloud_size=get_cloud_size,
29                                           actor_ref=mock_actor)
30         mock_actor.proxy.return_value = mock_proxy
31         mock_actor.tell_proxy.return_value = mock_proxy
32
33         self.last_setup = mock_proxy
34         return mock_actor
35
36     def mock_node_shutdown(self, **kwargs):
37         # Make sure that every time the daemon starts a shutdown actor,
38         # it gets a new mock object back.
39         get_cloud_node = mock.MagicMock()
40         if "node_monitor" in kwargs:
41             get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
42         mock_actor = mock.MagicMock()
43         mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
44                                           cloud_node=get_cloud_node,
45                                           actor_ref=mock_actor)
46
47         mock_actor.proxy.return_value = mock_proxy
48         self.last_shutdown = mock_proxy
49
50         return mock_actor
51
52     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
53                     avail_sizes=None,
54                     min_nodes=0, max_nodes=8,
55                     shutdown_windows=[54, 5, 1],
56                     max_total_price=None):
57         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
58             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
59
60         if not avail_sizes:
61             if cloud_nodes or want_sizes:
62                 avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
63             else:
64                 avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
65
66         self.arv_factory = mock.MagicMock(name='arvados_mock')
67         api_client = mock.MagicMock(name='api_client')
68         api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
69                                                            testutil.arvados_node_mock(2)]
70         self.arv_factory.return_value = api_client
71
72         self.cloud_factory = mock.MagicMock(name='cloud_mock')
73         self.cloud_factory().node_start_time.return_value = time.time()
74         self.cloud_updates = mock.MagicMock(name='updates_mock')
75         self.timer = testutil.MockTimer(deliver_immediately=False)
76         self.cloud_factory().node_id.side_effect = lambda node: node.id
77         self.cloud_factory().broken.return_value = False
78
79         self.node_setup = mock.MagicMock(name='setup_mock')
80         self.node_setup.start.side_effect = self.mock_node_start
81         self.node_setup.reset_mock()
82
83         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
84         self.node_shutdown.start.side_effect = self.mock_node_shutdown
85
86         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
87             self.server_wishlist_poller, self.arvados_nodes_poller,
88             self.cloud_nodes_poller, self.cloud_updates, self.timer,
89             self.arv_factory, self.cloud_factory,
90             shutdown_windows, ServerCalculator(avail_sizes),
91             min_nodes, max_nodes, 600, 1800, 3600,
92             self.node_setup, self.node_shutdown,
93             max_total_price=max_total_price).proxy()
94         if arvados_nodes is not None:
95             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
96         if cloud_nodes is not None:
97             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
98         if want_sizes is not None:
99             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
100
101     def monitor_list(self):
102         return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
103
104     def monitored_arvados_nodes(self):
105         pairings = []
106         for future in [actor.proxy().arvados_node
107                        for actor in self.monitor_list()]:
108             try:
109                 pairings.append(future.get(self.TIMEOUT))
110             except pykka.ActorDeadError:
111                 pass
112         return pairings
113
114     def alive_monitor_count(self):
115         return len(self.monitored_arvados_nodes())
116
117     def assertShutdownCancellable(self, expected=True):
118         self.assertTrue(self.node_shutdown.start.called)
119         self.assertIs(expected,
120                       self.node_shutdown.start.call_args[1]['cancellable'],
121                       "ComputeNodeShutdownActor incorrectly cancellable")
122
123     def test_easy_node_creation(self):
124         size = testutil.MockSize(1)
125         self.make_daemon(want_sizes=[size])
126         self.stop_proxy(self.daemon)
127         self.assertTrue(self.node_setup.start.called)
128
129     def check_monitors_arvados_nodes(self, *arv_nodes):
130         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
131
132     def test_node_pairing(self):
133         cloud_node = testutil.cloud_node_mock(1)
134         arv_node = testutil.arvados_node_mock(1)
135         self.make_daemon([cloud_node], [arv_node])
136         self.stop_proxy(self.daemon)
137         self.check_monitors_arvados_nodes(arv_node)
138
139     def test_node_pairing_after_arvados_update(self):
140         cloud_node = testutil.cloud_node_mock(2)
141         self.make_daemon([cloud_node],
142                          [testutil.arvados_node_mock(1, ip_address=None)])
143         arv_node = testutil.arvados_node_mock(2)
144         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
145         self.stop_proxy(self.daemon)
146         self.check_monitors_arvados_nodes(arv_node)
147
148     def test_arvados_node_un_and_re_paired(self):
149         # We need to create the Arvados node mock after spinning up the daemon
150         # to make sure it's new enough to pair with the cloud node.
151         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
152                          arvados_nodes=None)
153         arv_node = testutil.arvados_node_mock(3)
154         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
155         self.check_monitors_arvados_nodes(arv_node)
156         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
157         self.assertEqual(0, self.alive_monitor_count())
158         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
159         self.stop_proxy(self.daemon)
160         self.check_monitors_arvados_nodes(arv_node)
161
162     def test_old_arvados_node_not_double_assigned(self):
163         arv_node = testutil.arvados_node_mock(3, age=9000)
164         size = testutil.MockSize(3)
165         self.make_daemon(arvados_nodes=[arv_node],
166                          avail_sizes=[(size, {"cores":1})])
167         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
168         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
169         self.stop_proxy(self.daemon)
170         used_nodes = [call[1].get('arvados_node')
171                       for call in self.node_setup.start.call_args_list]
172         self.assertEqual(2, len(used_nodes))
173         self.assertIn(arv_node, used_nodes)
174         self.assertIn(None, used_nodes)
175
176     def test_node_count_satisfied(self):
177         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
178                          want_sizes=[testutil.MockSize(1)])
179         self.stop_proxy(self.daemon)
180         self.assertFalse(self.node_setup.start.called)
181
182     def test_dont_count_missing_as_busy(self):
183         size = testutil.MockSize(1)
184         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
185                                       testutil.cloud_node_mock(2, size=size)],
186                          arvados_nodes=[testutil.arvados_node_mock(1),
187                                         testutil.arvados_node_mock(
188                                             2,
189                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
190                          want_sizes=[size, size])
191         self.stop_proxy(self.daemon)
192         self.assertTrue(self.node_setup.start.called)
193
194     def test_missing_counts_towards_max(self):
195         size = testutil.MockSize(1)
196         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
197                                       testutil.cloud_node_mock(2, size=size)],
198                          arvados_nodes=[testutil.arvados_node_mock(1),
199                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
200                          want_sizes=[size, size],
201                          max_nodes=2)
202         self.stop_proxy(self.daemon)
203         self.assertFalse(self.node_setup.start.called)
204
205     def test_excess_counts_missing(self):
206         size = testutil.MockSize(1)
207         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
208         self.make_daemon(cloud_nodes=cloud_nodes,
209                          arvados_nodes=[testutil.arvados_node_mock(1),
210                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
211                          want_sizes=[size])
212         self.assertEqual(2, self.alive_monitor_count())
213         for mon_ref in self.monitor_list():
214             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
215         self.assertEqual(1, self.node_shutdown.start.call_count)
216
217     def test_missing_shutdown_not_excess(self):
218         size = testutil.MockSize(1)
219         cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
220         self.make_daemon(cloud_nodes=cloud_nodes,
221                          arvados_nodes=[testutil.arvados_node_mock(1),
222                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
223                          want_sizes=[size])
224
225         get_cloud_node = mock.MagicMock(name="get_cloud_node")
226         get_cloud_node.get.return_value = cloud_nodes[1]
227         mock_node_monitor = mock.MagicMock()
228         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
229         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
230
231         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
232
233         self.assertEqual(2, self.alive_monitor_count())
234         for mon_ref in self.monitor_list():
235             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
236         self.assertEqual(1, self.node_shutdown.start.call_count)
237
238     def test_booting_nodes_counted(self):
239         cloud_node = testutil.cloud_node_mock(1)
240         arv_node = testutil.arvados_node_mock(1)
241         server_wishlist = [testutil.MockSize(1)] * 2
242         self.make_daemon([cloud_node], [arv_node], server_wishlist)
243         self.daemon.max_nodes.get(self.TIMEOUT)
244         self.assertTrue(self.node_setup.start.called)
245         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
246         self.stop_proxy(self.daemon)
247         self.assertEqual(1, self.node_setup.start.call_count)
248
249     def test_boot_new_node_when_all_nodes_busy(self):
250         size = testutil.MockSize(2)
251         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
252         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
253                          [size], avail_sizes=[(size, {"cores":1})])
254         self.busywait(lambda: self.node_setup.start.called)
255         self.stop_proxy(self.daemon)
256         self.assertTrue(self.node_setup.start.called)
257
258     def test_boot_new_node_below_min_nodes(self):
259         min_size = testutil.MockSize(1)
260         wish_size = testutil.MockSize(3)
261         avail_sizes = [(min_size, {"cores": 1}),
262                        (wish_size, {"cores": 3})]
263         self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
264         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
265         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
266         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
267         self.stop_proxy(self.daemon)
268         self.assertEqual([wish_size, min_size],
269                          [call[1].get('cloud_size')
270                           for call in self.node_setup.start.call_args_list])
271
272     def test_no_new_node_when_ge_min_nodes_busy(self):
273         size = testutil.MockSize(2)
274         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
275         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
276                      for n in range(1, 4)]
277         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
278         self.stop_proxy(self.daemon)
279         self.assertEqual(0, self.node_setup.start.call_count)
280
281     def test_no_new_node_when_max_nodes_busy(self):
282         size = testutil.MockSize(3)
283         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
284                          arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
285                          want_sizes=[size],
286                          max_nodes=1)
287         self.stop_proxy(self.daemon)
288         self.assertFalse(self.node_setup.start.called)
289
290     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
291         if cloud_node is None:
292             cloud_node = testutil.cloud_node_mock(id_num)
293         id_num = int(cloud_node.id)
294         if arv_node is None:
295             arv_node = testutil.arvados_node_mock(id_num)
296         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
297                          avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
298         self.daemon.max_nodes.get(self.TIMEOUT)
299         self.assertEqual(1, self.node_setup.start.call_count)
300         self.last_setup.cloud_node.get.return_value = cloud_node
301         self.last_setup.arvados_node.get.return_value = arv_node
302         return self.last_setup
303
304     def test_new_node_when_booted_node_not_usable(self):
305         cloud_node = testutil.cloud_node_mock(4)
306         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
307         setup = self.start_node_boot(cloud_node, arv_node)
308         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
309         self.assertEqual(1, self.alive_monitor_count())
310         self.daemon.update_arvados_nodes([arv_node])
311         self.daemon.update_cloud_nodes([cloud_node])
312         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
313         self.daemon.update_server_wishlist(
314             [testutil.MockSize(4)]).get(self.TIMEOUT)
315         self.stop_proxy(self.daemon)
316         self.assertEqual(2, self.node_setup.start.call_count)
317
318     def test_no_duplication_when_booting_node_listed_fast(self):
319         # Test that we don't start two ComputeNodeMonitorActors when
320         # we learn about a booting node through a listing before we
321         # get the "node up" message from CloudNodeSetupActor.
322         cloud_node = testutil.cloud_node_mock(1)
323         setup = self.start_node_boot(cloud_node)
324         self.daemon.update_cloud_nodes([cloud_node])
325         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
326         self.assertEqual(1, self.alive_monitor_count())
327
328     def test_no_duplication_when_booted_node_listed(self):
329         cloud_node = testutil.cloud_node_mock(2)
330         setup = self.start_node_boot(cloud_node, id_num=2)
331         self.daemon.node_setup_finished(setup)
332         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
333         self.assertEqual(1, self.alive_monitor_count())
334
335     def test_node_counted_after_boot_with_slow_listing(self):
336         # Test that, after we boot a compute node, we assume it exists
337         # even it doesn't appear in the listing (e.g., because of delays
338         # propagating tags).
339         setup = self.start_node_boot()
340         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
341         self.assertEqual(1, self.alive_monitor_count())
342         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
343         self.assertEqual(1, self.alive_monitor_count())
344
345     def test_booted_unlisted_node_counted(self):
346         setup = self.start_node_boot(id_num=1)
347         self.daemon.node_setup_finished(setup)
348         self.daemon.update_server_wishlist(
349             [testutil.MockSize(1)]).get(self.TIMEOUT)
350         self.stop_proxy(self.daemon)
351         self.assertEqual(1, self.node_setup.start.call_count)
352
353     def test_booted_node_can_shutdown(self):
354         setup = self.start_node_boot()
355         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
356         self.assertEqual(1, self.alive_monitor_count())
357         monitor = self.monitor_list()[0].proxy()
358         self.daemon.update_server_wishlist([])
359         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
360         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
361         self.stop_proxy(self.daemon)
362         self.assertTrue(self.node_shutdown.start.called,
363                         "daemon did not shut down booted node on offer")
364
365         with test_status.TestServer() as srv:
366             self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
367             self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
368             self.assertEqual(0, srv.get_status().get('nodes_wish', None))
369
370     def test_booted_node_lifecycle(self):
371         cloud_node = testutil.cloud_node_mock(6)
372         setup = self.start_node_boot(cloud_node, id_num=6)
373         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
374         self.assertEqual(1, self.alive_monitor_count())
375         monitor = self.monitor_list()[0].proxy()
376         self.daemon.update_server_wishlist([])
377         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
378         self.assertShutdownCancellable(True)
379         shutdown = self.node_shutdown.start().proxy()
380         shutdown.cloud_node.get.return_value = cloud_node
381         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
382         self.daemon.update_cloud_nodes([])
383         self.assertTrue(shutdown.stop.called,
384                         "shutdown actor not stopped after finishing")
385         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
386                         "monitor for booted node not stopped after shutdown")
387         self.daemon.update_server_wishlist(
388             [testutil.MockSize(2)]).get(self.TIMEOUT)
389         self.stop_proxy(self.daemon)
390         self.assertTrue(self.node_setup.start.called,
391                         "second node not started after booted node stopped")
392
393     def test_booted_node_shut_down_when_never_listed(self):
394         setup = self.start_node_boot()
395         self.cloud_factory().node_start_time.return_value = time.time() - 3601
396         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
397         self.assertEqual(1, self.alive_monitor_count())
398         self.assertFalse(self.node_shutdown.start.called)
399         now = time.time()
400         self.monitor_list()[0].tell_proxy().consider_shutdown()
401         self.busywait(lambda: self.node_shutdown.start.called)
402         self.stop_proxy(self.daemon)
403         self.assertShutdownCancellable(False)
404
405     def test_booted_node_shut_down_when_never_paired(self):
406         cloud_node = testutil.cloud_node_mock(2)
407         setup = self.start_node_boot(cloud_node)
408         self.cloud_factory().node_start_time.return_value = time.time() - 3601
409         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
410         self.assertEqual(1, self.alive_monitor_count())
411         self.daemon.update_cloud_nodes([cloud_node])
412         self.monitor_list()[0].tell_proxy().consider_shutdown()
413         self.busywait(lambda: self.node_shutdown.start.called)
414         self.stop_proxy(self.daemon)
415         self.assertShutdownCancellable(False)
416
417     def test_booted_node_shut_down_when_never_working(self):
418         cloud_node = testutil.cloud_node_mock(4)
419         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
420         setup = self.start_node_boot(cloud_node, arv_node)
421         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
422         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
423         self.assertEqual(1, self.alive_monitor_count())
424         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
425         self.daemon.update_cloud_nodes([cloud_node])
426         self.busywait(lambda: self.node_shutdown.start.called)
427         self.stop_proxy(self.daemon)
428         self.assertShutdownCancellable(False)
429
430     def test_node_that_pairs_not_considered_failed_boot(self):
431         cloud_node = testutil.cloud_node_mock(3)
432         arv_node = testutil.arvados_node_mock(3)
433         setup = self.start_node_boot(cloud_node, arv_node)
434         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
435         self.assertEqual(1, self.alive_monitor_count())
436         self.daemon.update_cloud_nodes([cloud_node])
437         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
438         self.timer.deliver()
439         self.stop_proxy(self.daemon)
440         self.assertFalse(self.node_shutdown.start.called)
441
442     def test_node_that_pairs_busy_not_considered_failed_boot(self):
443         cloud_node = testutil.cloud_node_mock(5)
444         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
445         setup = self.start_node_boot(cloud_node, arv_node)
446         self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
447         self.assertEqual(1, self.alive_monitor_count())
448         self.daemon.update_cloud_nodes([cloud_node])
449         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
450         self.timer.deliver()
451         self.stop_proxy(self.daemon)
452         self.assertFalse(self.node_shutdown.start.called)
453
454     def test_booting_nodes_shut_down(self):
455         self.make_daemon(want_sizes=[testutil.MockSize(1)])
456         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
457         self.stop_proxy(self.daemon)
458         self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
459
460     def test_all_booting_nodes_tried_to_shut_down(self):
461         size = testutil.MockSize(2)
462         self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
463         self.daemon.max_nodes.get(self.TIMEOUT)
464         setup1 = self.last_setup
465         setup1.stop_if_no_cloud_node().get.return_value = False
466         setup1.stop_if_no_cloud_node.reset_mock()
467         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
468         self.daemon.max_nodes.get(self.TIMEOUT)
469         self.assertIsNot(setup1, self.last_setup)
470         self.last_setup.stop_if_no_cloud_node().get.return_value = True
471         self.last_setup.stop_if_no_cloud_node.reset_mock()
472         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
473         self.daemon.max_nodes.get(self.TIMEOUT)
474         self.stop_proxy(self.daemon)
475         self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
476         self.assertTrue(setup1.stop_if_no_cloud_node.called)
477
478     def test_shutdown_declined_at_wishlist_capacity(self):
479         cloud_node = testutil.cloud_node_mock(1)
480         arv_node = testutil.arvados_node_mock(1)
481         size = testutil.MockSize(1)
482         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
483         self.assertEqual(1, self.alive_monitor_count())
484         monitor = self.monitor_list()[0].proxy()
485         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
486         self.stop_proxy(self.daemon)
487         self.assertFalse(self.node_shutdown.start.called)
488
489     def test_shutdown_declined_below_min_nodes(self):
490         cloud_node = testutil.cloud_node_mock(1)
491         arv_node = testutil.arvados_node_mock(1)
492         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
493         self.assertEqual(1, self.alive_monitor_count())
494         monitor = self.monitor_list()[0].proxy()
495         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
496         self.stop_proxy(self.daemon)
497         self.assertFalse(self.node_shutdown.start.called)
498
499     def test_shutdown_accepted_below_capacity(self):
500         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
501         self.assertEqual(1, self.alive_monitor_count())
502         monitor = self.monitor_list()[0].proxy()
503         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
504         self.stop_proxy(self.daemon)
505         self.assertTrue(self.node_shutdown.start.called)
506
507     def test_shutdown_declined_when_idle_and_job_queued(self):
508         size = testutil.MockSize(1)
509         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
510         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
511                      testutil.arvados_node_mock(4, job_uuid=None)]
512         self.make_daemon(cloud_nodes, arv_nodes, [size])
513         self.assertEqual(2, self.alive_monitor_count())
514         for mon_ref in self.monitor_list():
515             monitor = mon_ref.proxy()
516             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
517                 break
518         else:
519             self.fail("monitor for idle node not found")
520         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
521         self.stop_proxy(self.daemon)
522         self.assertFalse(self.node_shutdown.start.called)
523
524     def test_node_shutdown_after_cancelled_shutdown(self):
525         cloud_node = testutil.cloud_node_mock(5)
526         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
527         self.assertEqual(1, self.alive_monitor_count())
528         monitor = self.monitor_list()[0].proxy()
529         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
530         self.last_shutdown.success.get.return_value = False
531         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
532         self.assertEqual(1, self.alive_monitor_count())
533
534         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
535         self.last_shutdown.success.get.return_value = True
536         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
537         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
538         self.assertEqual(0, self.alive_monitor_count())
539
540     def test_nodes_shutting_down_replaced_below_max_nodes(self):
541         size = testutil.MockSize(6)
542         cloud_node = testutil.cloud_node_mock(6, size=size)
543         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
544                          avail_sizes=[(size, {"cores":1})])
545         self.assertEqual(1, self.alive_monitor_count())
546         monitor = self.monitor_list()[0].proxy()
547         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
548         self.assertTrue(self.node_shutdown.start.called)
549         self.daemon.update_server_wishlist(
550             [testutil.MockSize(6)]).get(self.TIMEOUT)
551         self.stop_proxy(self.daemon)
552         self.assertTrue(self.node_setup.start.called)
553
554     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
555         cloud_node = testutil.cloud_node_mock(7)
556         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
557                          max_nodes=1)
558         self.assertEqual(1, self.alive_monitor_count())
559         monitor = self.monitor_list()[0].proxy()
560         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
561         self.assertTrue(self.node_shutdown.start.called)
562         self.daemon.update_server_wishlist(
563             [testutil.MockSize(7)]).get(self.TIMEOUT)
564         self.stop_proxy(self.daemon)
565         self.assertFalse(self.node_setup.start.called)
566
567     def test_nodes_shutting_down_count_against_excess(self):
568         size = testutil.MockSize(8)
569         cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
570         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
571         self.make_daemon(cloud_nodes, arv_nodes, [size],
572                          avail_sizes=[(size, {"cores":1})])
573         self.assertEqual(2, self.alive_monitor_count())
574         for mon_ref in self.monitor_list():
575             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
576         self.assertEqual(1, self.node_shutdown.start.call_count)
577
578     def test_clean_shutdown_waits_for_node_setup_finish(self):
579         new_node = self.start_node_boot()
580         new_node.stop_if_no_cloud_node().get.return_value = False
581         new_node.stop_if_no_cloud_node.reset_mock()
582         self.daemon.shutdown().get(self.TIMEOUT)
583         self.assertTrue(new_node.stop_if_no_cloud_node.called)
584         self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
585         self.assertTrue(new_node.stop.called)
586         self.timer.deliver()
587         self.assertTrue(
588             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
589
590     def test_wishlist_ignored_after_shutdown(self):
591         new_node = self.start_node_boot()
592         new_node.stop_if_no_cloud_node().get.return_value = False
593         new_node.stop_if_no_cloud_node.reset_mock()
594         self.daemon.shutdown().get(self.TIMEOUT)
595         size = testutil.MockSize(2)
596         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
597         self.timer.deliver()
598         self.stop_proxy(self.daemon)
599         self.assertEqual(1, self.node_setup.start.call_count)
600
601     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
602         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
603         self.assertEqual(1, self.alive_monitor_count())
604         monitor = self.monitor_list()[0].proxy()
605         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
606         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
607         self.stop_proxy(self.daemon)
608         self.assertEqual(
609             1, self.last_shutdown.stop.call_count)
610
611     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
612         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
613         self.assertEqual(1, self.alive_monitor_count())
614         monitor = self.monitor_list()[0].proxy()
615         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
616         # We're mainly testing that update_cloud_nodes catches and handles
617         # the ActorDeadError.
618         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
619         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
620         self.stop_proxy(self.daemon)
621         self.assertEqual(1, self.last_shutdown.stop.call_count)
622
623     def test_node_create_two_sizes(self):
624         small = testutil.MockSize(1)
625         big = testutil.MockSize(2)
626         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
627                         (testutil.MockSize(2), {"cores":2})]
628         self.make_daemon(want_sizes=[small, small, small, big],
629                          avail_sizes=avail_sizes, max_nodes=4)
630
631         # the daemon runs in another thread, so we need to wait and see
632         # if it does all the work we're expecting it to do before stopping it.
633         self.busywait(lambda: self.node_setup.start.call_count == 4)
634         booting = self.daemon.booting.get(self.TIMEOUT)
635         self.stop_proxy(self.daemon)
636         sizecounts = {a[0].id: 0 for a in avail_sizes}
637         for b in booting.itervalues():
638             sizecounts[b.cloud_size.get().id] += 1
639         logging.info(sizecounts)
640         self.assertEqual(3, sizecounts[small.id])
641         self.assertEqual(1, sizecounts[big.id])
642
643     def test_node_max_nodes_two_sizes(self):
644         small = testutil.MockSize(1)
645         big = testutil.MockSize(2)
646         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
647                         (testutil.MockSize(2), {"cores":2})]
648         self.make_daemon(want_sizes=[small, small, small, big],
649                          avail_sizes=avail_sizes, max_nodes=3)
650
651         # the daemon runs in another thread, so we need to wait and see
652         # if it does all the work we're expecting it to do before stopping it.
653         self.busywait(lambda: self.node_setup.start.call_count == 3)
654         booting = self.daemon.booting.get(self.TIMEOUT)
655         self.stop_proxy(self.daemon)
656         sizecounts = {a[0].id: 0 for a in avail_sizes}
657         for b in booting.itervalues():
658             sizecounts[b.cloud_size.get().id] += 1
659         self.assertEqual(2, sizecounts[small.id])
660         self.assertEqual(1, sizecounts[big.id])
661
662     def test_wishlist_reconfigure(self):
663         small = testutil.MockSize(1)
664         big = testutil.MockSize(2)
665         avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
666
667         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
668                                       testutil.cloud_node_mock(2, small),
669                                       testutil.cloud_node_mock(3, big)],
670                          arvados_nodes=[testutil.arvados_node_mock(1),
671                                         testutil.arvados_node_mock(2),
672                                         testutil.arvados_node_mock(3)],
673                          want_sizes=[small, small, big],
674                          avail_sizes=avail_sizes)
675
676         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
677
678         self.assertEqual(0, self.node_shutdown.start.call_count)
679
680         for c in self.daemon.cloud_nodes.get().nodes.itervalues():
681             self.daemon.node_can_shutdown(c.actor)
682
683         booting = self.daemon.booting.get()
684         cloud_nodes = self.daemon.cloud_nodes.get()
685
686         self.stop_proxy(self.daemon)
687
688         self.assertEqual(1, self.node_setup.start.call_count)
689         self.assertEqual(1, self.node_shutdown.start.call_count)
690
691         # booting a new big node
692         sizecounts = {a[0].id: 0 for a in avail_sizes}
693         for b in booting.itervalues():
694             sizecounts[b.cloud_size.get().id] += 1
695         self.assertEqual(0, sizecounts[small.id])
696         self.assertEqual(1, sizecounts[big.id])
697
698         # shutting down a small node
699         sizecounts = {a[0].id: 0 for a in avail_sizes}
700         for b in cloud_nodes.nodes.itervalues():
701             if b.shutdown_actor is not None:
702                 sizecounts[b.cloud_node.size.id] += 1
703         self.assertEqual(1, sizecounts[small.id])
704         self.assertEqual(0, sizecounts[big.id])
705
706     def test_node_max_price(self):
707         small = testutil.MockSize(1)
708         big = testutil.MockSize(2)
709         avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
710                         (testutil.MockSize(2), {"cores":2, "price":2})]
711         self.make_daemon(want_sizes=[small, small, small, big],
712                          avail_sizes=avail_sizes,
713                          max_nodes=4,
714                          max_total_price=4)
715         # the daemon runs in another thread, so we need to wait and see
716         # if it does all the work we're expecting it to do before stopping it.
717         self.busywait(lambda: self.node_setup.start.call_count == 3)
718         booting = self.daemon.booting.get()
719         self.stop_proxy(self.daemon)
720
721         sizecounts = {a[0].id: 0 for a in avail_sizes}
722         for b in booting.itervalues():
723             sizecounts[b.cloud_size.get().id] += 1
724         logging.info(sizecounts)
725
726         # Booting 3 small nodes and not booting a big node would also partially
727         # satisfy the wishlist and come in under the price cap, however the way
728         # the update_server_wishlist() currently works effectively results in a
729         # round-robin creation of one node of each size in the wishlist, so
730         # test for that.
731         self.assertEqual(2, sizecounts[small.id])
732         self.assertEqual(1, sizecounts[big.id])