3 from __future__ import absolute_import, print_function
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
17 def new_setup_proxy(self):
18 # Make sure that every time the daemon starts a setup actor,
19 # it gets a new mock object back.
20 self.last_setup = mock.MagicMock(name='setup_proxy_mock')
21 return self.last_setup
23 def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
24 min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
25 for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
26 setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
27 self.arv_factory = mock.MagicMock(name='arvados_mock')
28 self.cloud_factory = mock.MagicMock(name='cloud_mock')
29 self.cloud_factory().node_start_time.return_value = time.time()
30 self.cloud_updates = mock.MagicMock(name='updates_mock')
31 self.timer = testutil.MockTimer(deliver_immediately=False)
32 self.node_setup = mock.MagicMock(name='setup_mock')
33 self.node_setup.start().proxy.side_effect = self.new_setup_proxy
34 self.node_setup.reset_mock()
35 self.node_shutdown = mock.MagicMock(name='shutdown_mock')
36 self.daemon = nmdaemon.NodeManagerDaemonActor.start(
37 self.server_wishlist_poller, self.arvados_nodes_poller,
38 self.cloud_nodes_poller, self.cloud_updates, self.timer,
39 self.arv_factory, self.cloud_factory,
40 [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
41 self.node_setup, self.node_shutdown).proxy()
42 if cloud_nodes is not None:
43 self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
44 if arvados_nodes is not None:
45 self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
46 if want_sizes is not None:
47 self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
49 def monitor_list(self):
50 return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
52 def alive_monitor_count(self):
53 return sum(1 for actor in self.monitor_list() if actor.is_alive())
55 def assertShutdownCancellable(self, expected=True):
56 self.assertTrue(self.node_shutdown.start.called)
57 self.assertIs(expected,
58 self.node_shutdown.start.call_args[1]['cancellable'],
59 "ComputeNodeShutdownActor incorrectly cancellable")
61 def test_easy_node_creation(self):
62 size = testutil.MockSize(1)
63 self.make_daemon(want_sizes=[size])
64 self.stop_proxy(self.daemon)
65 self.assertTrue(self.node_setup.start.called)
67 def check_monitors_arvados_nodes(self, *arv_nodes):
68 pairings = [monitor.proxy().arvados_node
69 for monitor in self.monitor_list() if monitor.is_alive()]
70 self.assertItemsEqual(arv_nodes, pykka.get_all(pairings, self.TIMEOUT))
72 def test_node_pairing(self):
73 cloud_node = testutil.cloud_node_mock(1)
74 arv_node = testutil.arvados_node_mock(1)
75 self.make_daemon([cloud_node], [arv_node])
76 self.stop_proxy(self.daemon)
77 self.check_monitors_arvados_nodes(arv_node)
79 def test_node_pairing_after_arvados_update(self):
80 cloud_node = testutil.cloud_node_mock(2)
81 self.make_daemon([cloud_node],
82 [testutil.arvados_node_mock(2, ip_address=None)])
83 arv_node = testutil.arvados_node_mock(2)
84 self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
85 self.stop_proxy(self.daemon)
86 self.check_monitors_arvados_nodes(arv_node)
88 def test_arvados_node_un_and_re_paired(self):
89 arv_node = testutil.arvados_node_mock(3)
90 self.make_daemon([testutil.cloud_node_mock(3)], [arv_node])
91 self.check_monitors_arvados_nodes(arv_node)
92 self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
93 self.assertEqual(0, self.alive_monitor_count())
94 self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
95 self.stop_proxy(self.daemon)
96 self.check_monitors_arvados_nodes(arv_node)
98 def test_old_arvados_node_not_double_assigned(self):
99 arv_node = testutil.arvados_node_mock(3, age=9000)
100 size = testutil.MockSize(3)
101 self.make_daemon(arvados_nodes=[arv_node])
102 self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
103 self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
104 self.stop_proxy(self.daemon)
105 used_nodes = [call[1].get('arvados_node')
106 for call in self.node_setup.start.call_args_list]
107 self.assertEqual(2, len(used_nodes))
108 self.assertIn(arv_node, used_nodes)
109 self.assertIn(None, used_nodes)
111 def test_node_count_satisfied(self):
112 self.make_daemon([testutil.cloud_node_mock()],
113 want_sizes=[testutil.MockSize(1)])
114 self.stop_proxy(self.daemon)
115 self.assertFalse(self.node_setup.called)
117 def test_booting_nodes_counted(self):
118 cloud_node = testutil.cloud_node_mock(1)
119 arv_node = testutil.arvados_node_mock(1)
120 server_wishlist = [testutil.MockSize(1)] * 2
121 self.make_daemon([cloud_node], [arv_node], server_wishlist)
122 self.daemon.max_nodes.get(self.TIMEOUT)
123 self.assertTrue(self.node_setup.start.called)
124 self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
125 self.stop_proxy(self.daemon)
126 self.assertEqual(1, self.node_setup.start.call_count)
128 def test_boot_new_node_when_all_nodes_busy(self):
129 arv_node = testutil.arvados_node_mock(2, job_uuid=True)
130 self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
131 [testutil.MockSize(2)])
132 self.stop_proxy(self.daemon)
133 self.assertTrue(self.node_setup.start.called)
135 def test_boot_new_node_below_min_nodes(self):
136 min_size = testutil.MockSize(1)
137 wish_size = testutil.MockSize(3)
138 self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
139 self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
140 self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
141 self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
142 self.stop_proxy(self.daemon)
143 self.assertEqual([wish_size, min_size],
144 [call[1].get('cloud_size')
145 for call in self.node_setup.start.call_args_list])
147 def test_no_new_node_when_ge_min_nodes_busy(self):
148 cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
149 arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
150 for n in range(1, 4)]
151 self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
152 self.stop_proxy(self.daemon)
153 self.assertEqual(0, self.node_setup.start.call_count)
155 def test_no_new_node_when_max_nodes_busy(self):
156 self.make_daemon([testutil.cloud_node_mock(3)],
157 [testutil.arvados_node_mock(3, job_uuid=True)],
158 [testutil.MockSize(3)],
160 self.stop_proxy(self.daemon)
161 self.assertFalse(self.node_setup.start.called)
163 def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
164 if cloud_node is None:
165 cloud_node = testutil.cloud_node_mock(id_num)
167 arv_node = testutil.arvados_node_mock(id_num)
168 self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
169 self.daemon.max_nodes.get(self.TIMEOUT)
170 self.assertEqual(1, self.node_setup.start.call_count)
171 self.last_setup.cloud_node.get.return_value = cloud_node
172 self.last_setup.arvados_node.get.return_value = arv_node
173 return self.last_setup
175 def test_no_duplication_when_booting_node_listed_fast(self):
176 # Test that we don't start two ComputeNodeMonitorActors when
177 # we learn about a booting node through a listing before we
178 # get the "node up" message from CloudNodeSetupActor.
179 cloud_node = testutil.cloud_node_mock(1)
180 setup = self.start_node_boot(cloud_node)
181 self.daemon.update_cloud_nodes([cloud_node])
182 self.daemon.node_up(setup).get(self.TIMEOUT)
183 self.assertEqual(1, self.alive_monitor_count())
185 def test_no_duplication_when_booted_node_listed(self):
186 cloud_node = testutil.cloud_node_mock(2)
187 setup = self.start_node_boot(cloud_node, id_num=2)
188 self.daemon.node_up(setup)
189 self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
190 self.assertEqual(1, self.alive_monitor_count())
192 def test_node_counted_after_boot_with_slow_listing(self):
193 # Test that, after we boot a compute node, we assume it exists
194 # even it doesn't appear in the listing (e.g., because of delays
196 setup = self.start_node_boot()
197 self.daemon.node_up(setup).get(self.TIMEOUT)
198 self.assertEqual(1, self.alive_monitor_count())
199 self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
200 self.assertEqual(1, self.alive_monitor_count())
202 def test_booted_unlisted_node_counted(self):
203 setup = self.start_node_boot(id_num=1)
204 self.daemon.node_up(setup)
205 self.daemon.update_server_wishlist(
206 [testutil.MockSize(1)]).get(self.TIMEOUT)
207 self.stop_proxy(self.daemon)
208 self.assertEqual(1, self.node_setup.start.call_count)
210 def test_booted_node_can_shutdown(self):
211 setup = self.start_node_boot()
212 self.daemon.node_up(setup).get(self.TIMEOUT)
213 self.assertEqual(1, self.alive_monitor_count())
214 monitor = self.monitor_list()[0].proxy()
215 self.daemon.update_server_wishlist([])
216 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
217 self.stop_proxy(self.daemon)
218 self.assertTrue(self.node_shutdown.start.called,
219 "daemon did not shut down booted node on offer")
221 def test_booted_node_lifecycle(self):
222 cloud_node = testutil.cloud_node_mock(6)
223 setup = self.start_node_boot(cloud_node, id_num=6)
224 self.daemon.node_up(setup).get(self.TIMEOUT)
225 self.assertEqual(1, self.alive_monitor_count())
226 monitor = self.monitor_list()[0].proxy()
227 self.daemon.update_server_wishlist([])
228 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
229 self.assertShutdownCancellable(True)
230 shutdown = self.node_shutdown.start().proxy()
231 shutdown.cloud_node.get.return_value = cloud_node
232 self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
233 self.assertTrue(shutdown.stop.called,
234 "shutdown actor not stopped after finishing")
235 self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
236 "monitor for booted node not stopped after shutdown")
237 self.daemon.update_server_wishlist(
238 [testutil.MockSize(2)]).get(self.TIMEOUT)
239 self.stop_proxy(self.daemon)
240 self.assertTrue(self.node_setup.start.called,
241 "second node not started after booted node stopped")
243 def test_booted_node_shut_down_when_never_listed(self):
244 setup = self.start_node_boot()
245 self.daemon.node_up(setup).get(self.TIMEOUT)
246 self.assertEqual(1, self.alive_monitor_count())
247 self.assertFalse(self.node_shutdown.start.called)
249 self.stop_proxy(self.daemon)
250 self.assertShutdownCancellable(False)
252 def test_booted_node_shut_down_when_never_paired(self):
253 cloud_node = testutil.cloud_node_mock(2)
254 setup = self.start_node_boot(cloud_node)
255 self.daemon.node_up(setup).get(self.TIMEOUT)
256 self.assertEqual(1, self.alive_monitor_count())
257 self.daemon.update_cloud_nodes([cloud_node])
259 self.stop_proxy(self.daemon)
260 self.assertShutdownCancellable(False)
262 def test_node_that_pairs_not_considered_failed_boot(self):
263 cloud_node = testutil.cloud_node_mock(3)
264 arv_node = testutil.arvados_node_mock(3)
265 setup = self.start_node_boot(cloud_node, arv_node)
266 self.daemon.node_up(setup).get(self.TIMEOUT)
267 self.assertEqual(1, self.alive_monitor_count())
268 self.daemon.update_cloud_nodes([cloud_node])
269 self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
271 self.stop_proxy(self.daemon)
272 self.assertFalse(self.node_shutdown.start.called)
274 def test_booting_nodes_shut_down(self):
275 self.make_daemon(want_sizes=[testutil.MockSize(1)])
276 self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
277 self.stop_proxy(self.daemon)
278 self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
280 def test_shutdown_declined_at_wishlist_capacity(self):
281 cloud_node = testutil.cloud_node_mock(1)
282 size = testutil.MockSize(1)
283 self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
284 self.assertEqual(1, self.alive_monitor_count())
285 monitor = self.monitor_list()[0].proxy()
286 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
287 self.stop_proxy(self.daemon)
288 self.assertFalse(self.node_shutdown.start.called)
290 def test_shutdown_declined_below_min_nodes(self):
291 cloud_node = testutil.cloud_node_mock(1)
292 self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
293 self.assertEqual(1, self.alive_monitor_count())
294 monitor = self.monitor_list()[0].proxy()
295 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
296 self.stop_proxy(self.daemon)
297 self.assertFalse(self.node_shutdown.start.called)
299 def test_shutdown_accepted_below_capacity(self):
300 self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
301 self.assertEqual(1, self.alive_monitor_count())
302 monitor = self.monitor_list()[0].proxy()
303 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
304 self.stop_proxy(self.daemon)
305 self.assertTrue(self.node_shutdown.start.called)
307 def test_shutdown_declined_when_idle_and_job_queued(self):
308 cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
309 arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
310 testutil.arvados_node_mock(4, job_uuid=None)]
311 self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
312 self.assertEqual(2, self.alive_monitor_count())
313 for mon_ref in self.monitor_list():
314 monitor = mon_ref.proxy()
315 if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
318 self.fail("monitor for idle node not found")
319 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
320 self.stop_proxy(self.daemon)
321 self.assertFalse(self.node_shutdown.start.called)
323 def test_node_shutdown_after_cancelled_shutdown(self):
324 cloud_node = testutil.cloud_node_mock(5)
325 self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
326 self.assertEqual(1, self.alive_monitor_count())
327 monitor = self.monitor_list()[0].proxy()
328 shutdown_proxy = self.node_shutdown.start().proxy
329 shutdown_proxy().cloud_node.get.return_value = cloud_node
330 shutdown_proxy().success.get.return_value = False
331 shutdown_proxy.reset_mock()
332 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
333 self.assertTrue(shutdown_proxy.called)
334 self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
335 shutdown_proxy().success.get.return_value = True
336 shutdown_proxy.reset_mock()
337 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
338 self.assertTrue(shutdown_proxy.called)
340 def test_nodes_shutting_down_replaced_below_max_nodes(self):
341 cloud_node = testutil.cloud_node_mock(6)
342 self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
343 self.assertEqual(1, self.alive_monitor_count())
344 monitor = self.monitor_list()[0].proxy()
345 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
346 self.assertTrue(self.node_shutdown.start.called)
347 self.daemon.update_server_wishlist(
348 [testutil.MockSize(6)]).get(self.TIMEOUT)
349 self.stop_proxy(self.daemon)
350 self.assertTrue(self.node_setup.start.called)
352 def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
353 cloud_node = testutil.cloud_node_mock(7)
354 self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
356 self.assertEqual(1, self.alive_monitor_count())
357 monitor = self.monitor_list()[0].proxy()
358 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
359 self.assertTrue(self.node_shutdown.start.called)
360 self.daemon.update_server_wishlist(
361 [testutil.MockSize(7)]).get(self.TIMEOUT)
362 self.stop_proxy(self.daemon)
363 self.assertFalse(self.node_setup.start.called)
365 def test_nodes_shutting_down_count_against_excess(self):
366 cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
367 arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
368 self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
369 self.assertEqual(2, self.alive_monitor_count())
370 for mon_ref in self.monitor_list():
371 self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
372 self.assertEqual(1, self.node_shutdown.start.call_count)
374 def test_clean_shutdown_waits_for_node_setup_finish(self):
375 new_node = self.start_node_boot()
376 self.daemon.shutdown().get(self.TIMEOUT)
377 self.assertTrue(new_node.stop_if_no_cloud_node.called)
378 self.daemon.node_up(new_node).get(self.TIMEOUT)
379 self.assertTrue(new_node.stop.called)
382 self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
384 def test_wishlist_ignored_after_shutdown(self):
385 size = testutil.MockSize(2)
386 self.make_daemon(want_sizes=[size])
387 self.daemon.shutdown().get(self.TIMEOUT)
388 self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
390 self.stop_proxy(self.daemon)
391 self.assertEqual(1, self.node_setup.start.call_count)