3 from __future__ import absolute_import, print_function
11 import arvnodeman.daemon as nmdaemon
12 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
13 from . import testutil
15 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
17 def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
18 min_nodes=0, max_nodes=8):
19 for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
20 setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
21 self.arv_factory = mock.MagicMock(name='arvados_mock')
22 self.cloud_factory = mock.MagicMock(name='cloud_mock')
23 self.cloud_factory().node_start_time.return_value = time.time()
24 self.cloud_updates = mock.MagicMock(name='updates_mock')
25 self.timer = testutil.MockTimer(deliver_immediately=False)
26 self.node_setup = mock.MagicMock(name='setup_mock')
27 self.node_shutdown = mock.MagicMock(name='shutdown_mock')
28 self.daemon = nmdaemon.NodeManagerDaemonActor.start(
29 self.server_wishlist_poller, self.arvados_nodes_poller,
30 self.cloud_nodes_poller, self.cloud_updates, self.timer,
31 self.arv_factory, self.cloud_factory,
32 [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
33 self.node_setup, self.node_shutdown).proxy()
34 if cloud_nodes is not None:
35 self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
36 if arvados_nodes is not None:
37 self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
38 if want_sizes is not None:
39 self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
41 def monitor_list(self):
42 return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
44 def alive_monitor_count(self):
45 return sum(1 for actor in self.monitor_list() if actor.is_alive())
47 def assertShutdownCancellable(self, expected=True):
48 self.assertTrue(self.node_shutdown.start.called)
49 self.assertIs(expected,
50 self.node_shutdown.start.call_args[1]['cancellable'],
51 "ComputeNodeShutdownActor incorrectly cancellable")
53 def test_easy_node_creation(self):
54 size = testutil.MockSize(1)
55 self.make_daemon(want_sizes=[size])
56 self.stop_proxy(self.daemon)
57 self.assertTrue(self.node_setup.start.called)
59 def test_node_pairing(self):
60 cloud_node = testutil.cloud_node_mock(1)
61 arv_node = testutil.arvados_node_mock(1)
62 self.make_daemon([cloud_node], [arv_node])
63 self.stop_proxy(self.daemon)
64 self.assertEqual(1, self.alive_monitor_count())
66 self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
69 def test_node_pairing_after_arvados_update(self):
70 cloud_node = testutil.cloud_node_mock(2)
71 self.make_daemon([cloud_node],
72 [testutil.arvados_node_mock(2, ip_address=None)])
73 arv_node = testutil.arvados_node_mock(2)
74 self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
75 self.stop_proxy(self.daemon)
76 self.assertEqual(1, self.alive_monitor_count())
78 self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
81 def test_old_arvados_node_not_double_assigned(self):
82 arv_node = testutil.arvados_node_mock(3, age=9000)
83 size = testutil.MockSize(3)
84 self.make_daemon(arvados_nodes=[arv_node])
85 setup_ref = self.node_setup.start().proxy().actor_ref
86 setup_ref.actor_urn = 0
87 self.node_setup.start.reset_mock()
88 self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
89 self.daemon.max_nodes.get(self.TIMEOUT)
90 setup_ref.actor_urn += 1
91 self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
92 self.stop_proxy(self.daemon)
93 used_nodes = [call[1].get('arvados_node')
94 for call in self.node_setup.start.call_args_list]
95 self.assertEqual(2, len(used_nodes))
96 self.assertIn(arv_node, used_nodes)
97 self.assertIn(None, used_nodes)
99 def test_node_count_satisfied(self):
100 self.make_daemon([testutil.cloud_node_mock()],
101 want_sizes=[testutil.MockSize(1)])
102 self.stop_proxy(self.daemon)
103 self.assertFalse(self.node_setup.called)
105 def test_booting_nodes_counted(self):
106 cloud_node = testutil.cloud_node_mock(1)
107 arv_node = testutil.arvados_node_mock(1)
108 server_wishlist = [testutil.MockSize(1)] * 2
109 self.make_daemon([cloud_node], [arv_node], server_wishlist)
110 self.daemon.max_nodes.get(self.TIMEOUT)
111 self.assertTrue(self.node_setup.start.called)
112 self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
113 self.stop_proxy(self.daemon)
114 self.assertEqual(1, self.node_setup.start.call_count)
116 def test_boot_new_node_when_all_nodes_busy(self):
117 arv_node = testutil.arvados_node_mock(2, job_uuid=True)
118 self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
119 [testutil.MockSize(2)])
120 self.stop_proxy(self.daemon)
121 self.assertTrue(self.node_setup.start.called)
123 def test_no_new_node_when_max_nodes_busy(self):
124 self.make_daemon([testutil.cloud_node_mock(3)],
125 [testutil.arvados_node_mock(3, job_uuid=True)],
126 [testutil.MockSize(3)],
128 self.stop_proxy(self.daemon)
129 self.assertFalse(self.node_setup.start.called)
131 def mock_setup_actor(self, cloud_node, arv_node):
132 setup = self.node_setup.start().proxy()
133 self.node_setup.reset_mock()
134 setup.actor_urn = cloud_node.id
135 setup.cloud_node.get.return_value = cloud_node
136 setup.arvados_node.get.return_value = arv_node
139 def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
140 if cloud_node is None:
141 cloud_node = testutil.cloud_node_mock(id_num)
143 arv_node = testutil.arvados_node_mock(id_num)
144 self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
145 self.daemon.max_nodes.get(self.TIMEOUT)
146 self.assertEqual(1, self.node_setup.start.call_count)
147 return self.mock_setup_actor(cloud_node, arv_node)
149 def test_no_duplication_when_booting_node_listed_fast(self):
150 # Test that we don't start two ComputeNodeMonitorActors when
151 # we learn about a booting node through a listing before we
152 # get the "node up" message from CloudNodeSetupActor.
153 cloud_node = testutil.cloud_node_mock(1)
154 setup = self.start_node_boot(cloud_node)
155 self.daemon.update_cloud_nodes([cloud_node])
156 self.daemon.node_up(setup).get(self.TIMEOUT)
157 self.assertEqual(1, self.alive_monitor_count())
159 def test_no_duplication_when_booted_node_listed(self):
160 cloud_node = testutil.cloud_node_mock(2)
161 setup = self.start_node_boot(cloud_node, id_num=2)
162 self.daemon.node_up(setup)
163 self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
164 self.assertEqual(1, self.alive_monitor_count())
166 def test_node_counted_after_boot_with_slow_listing(self):
167 # Test that, after we boot a compute node, we assume it exists
168 # even it doesn't appear in the listing (e.g., because of delays
170 setup = self.start_node_boot()
171 self.daemon.node_up(setup).get(self.TIMEOUT)
172 self.assertEqual(1, self.alive_monitor_count())
173 self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
174 self.assertEqual(1, self.alive_monitor_count())
176 def test_booted_unlisted_node_counted(self):
177 setup = self.start_node_boot(id_num=1)
178 self.daemon.node_up(setup)
179 self.daemon.update_server_wishlist(
180 [testutil.MockSize(1)]).get(self.TIMEOUT)
181 self.stop_proxy(self.daemon)
182 self.assertFalse(self.node_setup.start.called,
183 "daemon did not count booted node toward wishlist")
185 def test_booted_node_can_shutdown(self):
186 setup = self.start_node_boot()
187 self.daemon.node_up(setup).get(self.TIMEOUT)
188 self.assertEqual(1, self.alive_monitor_count())
189 monitor = self.monitor_list()[0].proxy()
190 self.daemon.update_server_wishlist([])
191 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
192 self.stop_proxy(self.daemon)
193 self.assertTrue(self.node_shutdown.start.called,
194 "daemon did not shut down booted node on offer")
196 def test_booted_node_lifecycle(self):
197 cloud_node = testutil.cloud_node_mock(6)
198 setup = self.start_node_boot(cloud_node, id_num=6)
199 self.daemon.node_up(setup).get(self.TIMEOUT)
200 self.assertEqual(1, self.alive_monitor_count())
201 monitor = self.monitor_list()[0].proxy()
202 self.daemon.update_server_wishlist([])
203 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
204 self.assertShutdownCancellable(True)
205 shutdown = self.node_shutdown.start().proxy()
206 shutdown.cloud_node.get.return_value = cloud_node
207 self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
208 self.assertTrue(shutdown.stop.called,
209 "shutdown actor not stopped after finishing")
210 self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
211 "monitor for booted node not stopped after shutdown")
212 self.daemon.update_server_wishlist(
213 [testutil.MockSize(2)]).get(self.TIMEOUT)
214 self.stop_proxy(self.daemon)
215 self.assertTrue(self.node_setup.start.called,
216 "second node not started after booted node stopped")
218 def test_booted_node_shut_down_when_never_listed(self):
219 setup = self.start_node_boot()
220 self.daemon.node_up(setup).get(self.TIMEOUT)
221 self.assertEqual(1, self.alive_monitor_count())
222 self.assertFalse(self.node_shutdown.start.called)
224 self.stop_proxy(self.daemon)
225 self.assertShutdownCancellable(False)
227 def test_booted_node_shut_down_when_never_paired(self):
228 cloud_node = testutil.cloud_node_mock(2)
229 setup = self.start_node_boot(cloud_node)
230 self.daemon.node_up(setup).get(self.TIMEOUT)
231 self.assertEqual(1, self.alive_monitor_count())
232 self.daemon.update_cloud_nodes([cloud_node])
234 self.stop_proxy(self.daemon)
235 self.assertShutdownCancellable(False)
237 def test_node_that_pairs_not_considered_failed_boot(self):
238 cloud_node = testutil.cloud_node_mock(3)
239 arv_node = testutil.arvados_node_mock(3)
240 setup = self.start_node_boot(cloud_node, arv_node)
241 self.daemon.node_up(setup).get(self.TIMEOUT)
242 self.assertEqual(1, self.alive_monitor_count())
243 self.daemon.update_cloud_nodes([cloud_node])
244 self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
246 self.stop_proxy(self.daemon)
247 self.assertFalse(self.node_shutdown.start.called)
249 def test_booting_nodes_shut_down(self):
250 self.make_daemon(want_sizes=[testutil.MockSize(1)])
251 self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
252 self.stop_proxy(self.daemon)
254 self.node_setup.start().proxy().stop_if_no_cloud_node.called)
256 def test_shutdown_declined_at_wishlist_capacity(self):
257 cloud_node = testutil.cloud_node_mock(1)
258 size = testutil.MockSize(1)
259 self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
260 self.assertEqual(1, self.alive_monitor_count())
261 monitor = self.monitor_list()[0].proxy()
262 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
263 self.stop_proxy(self.daemon)
264 self.assertFalse(self.node_shutdown.start.called)
266 def test_shutdown_declined_below_min_nodes(self):
267 cloud_node = testutil.cloud_node_mock(1)
268 self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
269 self.assertEqual(1, self.alive_monitor_count())
270 monitor = self.monitor_list()[0].proxy()
271 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
272 self.stop_proxy(self.daemon)
273 self.assertFalse(self.node_shutdown.start.called)
275 def test_shutdown_accepted_below_capacity(self):
276 self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
277 self.assertEqual(1, self.alive_monitor_count())
278 monitor = self.monitor_list()[0].proxy()
279 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
280 self.stop_proxy(self.daemon)
281 self.assertTrue(self.node_shutdown.start.called)
283 def test_shutdown_declined_when_idle_and_job_queued(self):
284 cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
285 arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
286 testutil.arvados_node_mock(4, job_uuid=None)]
287 self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
288 self.assertEqual(2, self.alive_monitor_count())
289 for mon_ref in self.monitor_list():
290 monitor = mon_ref.proxy()
291 if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
294 self.fail("monitor for idle node not found")
295 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
296 self.stop_proxy(self.daemon)
297 self.assertFalse(self.node_shutdown.start.called)
299 def test_node_shutdown_after_cancelled_shutdown(self):
300 cloud_node = testutil.cloud_node_mock(5)
301 self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
302 self.assertEqual(1, self.alive_monitor_count())
303 monitor = self.monitor_list()[0].proxy()
304 shutdown_proxy = self.node_shutdown.start().proxy
305 shutdown_proxy().cloud_node.get.return_value = cloud_node
306 shutdown_proxy().success.get.return_value = False
307 shutdown_proxy.reset_mock()
308 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
309 self.assertTrue(shutdown_proxy.called)
310 self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
311 shutdown_proxy().success.get.return_value = True
312 shutdown_proxy.reset_mock()
313 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
314 self.assertTrue(shutdown_proxy.called)
316 def test_nodes_shutting_down_replaced_below_max_nodes(self):
317 cloud_node = testutil.cloud_node_mock(6)
318 self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
319 self.assertEqual(1, self.alive_monitor_count())
320 monitor = self.monitor_list()[0].proxy()
321 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
322 self.assertTrue(self.node_shutdown.start.called)
323 self.daemon.update_server_wishlist(
324 [testutil.MockSize(6)]).get(self.TIMEOUT)
325 self.stop_proxy(self.daemon)
326 self.assertTrue(self.node_setup.start.called)
328 def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
329 cloud_node = testutil.cloud_node_mock(7)
330 self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
332 self.assertEqual(1, self.alive_monitor_count())
333 monitor = self.monitor_list()[0].proxy()
334 self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
335 self.assertTrue(self.node_shutdown.start.called)
336 self.daemon.update_server_wishlist(
337 [testutil.MockSize(7)]).get(self.TIMEOUT)
338 self.stop_proxy(self.daemon)
339 self.assertFalse(self.node_setup.start.called)
341 def test_nodes_shutting_down_count_against_excess(self):
342 cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
343 arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
344 self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
345 self.assertEqual(2, self.alive_monitor_count())
346 for mon_ref in self.monitor_list():
347 self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
348 self.assertEqual(1, self.node_shutdown.start.call_count)
350 def test_clean_shutdown_waits_for_node_setup_finish(self):
351 new_node = self.start_node_boot()
352 self.daemon.shutdown().get(self.TIMEOUT)
353 self.assertTrue(new_node.stop_if_no_cloud_node.called)
354 self.daemon.node_up(new_node).get(self.TIMEOUT)
355 self.assertTrue(new_node.stop.called)
358 self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
360 def test_wishlist_ignored_after_shutdown(self):
361 size = testutil.MockSize(2)
362 self.make_daemon(want_sizes=[size])
363 self.daemon.shutdown().get(self.TIMEOUT)
364 self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
366 self.stop_proxy(self.daemon)
367 self.assertEqual(1, self.node_setup.start.call_count)