2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
11 import arvados.errors as arverror
17 from libcloud.common.exceptions import BaseHTTPError
19 import arvnodeman.computenode.dispatch as dispatch
20 import arvnodeman.status as status
21 from arvnodeman.computenode.driver import BaseComputeNodeDriver
22 from . import testutil
24 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
25 ACTOR_CLASS = dispatch.ComputeNodeSetupActor
27 def make_mocks(self, arvados_effect=None):
28 if arvados_effect is None:
29 arvados_effect = [testutil.arvados_node_mock(
35 self.arvados_effect = arvados_effect
36 self.timer = testutil.MockTimer()
37 self.api_client = mock.MagicMock(name='api_client')
38 self.api_client.nodes().create().execute.side_effect = arvados_effect
39 self.api_client.nodes().update().execute.side_effect = arvados_effect
40 self.cloud_client = mock.MagicMock(name='cloud_client')
41 self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
43 def make_actor(self, arv_node=None):
44 if not hasattr(self, 'timer'):
45 self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
46 self.setup_actor = self.ACTOR_CLASS.start(
47 self.timer, self.api_client, self.cloud_client,
48 testutil.MockSize(1), arv_node).proxy()
50 def assert_node_properties_updated(self, uuid=None,
51 size=testutil.MockSize(1)):
52 self.api_client.nodes().update.assert_any_call(
53 uuid=(uuid or self.arvados_effect[-1]['uuid']),
58 'price': size.price}}})
60 def test_creation_without_arvados_node(self):
62 finished = threading.Event()
63 self.setup_actor.subscribe(lambda _: finished.set())
64 self.assertEqual(self.arvados_effect[-1],
65 self.setup_actor.arvados_node.get(self.TIMEOUT))
66 assert(finished.wait(self.TIMEOUT))
67 self.api_client.nodes().create.called_with(body={}, assign_slot=True)
68 self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
69 self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
70 self.assert_node_properties_updated()
71 self.assertEqual(self.cloud_client.create_node(),
72 self.setup_actor.cloud_node.get(self.TIMEOUT))
74 def test_creation_with_arvados_node(self):
75 self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
76 self.make_actor(testutil.arvados_node_mock())
77 finished = threading.Event()
78 self.setup_actor.subscribe(lambda _: finished.set())
79 self.assertEqual(self.arvados_effect[-1],
80 self.setup_actor.arvados_node.get(self.TIMEOUT))
81 assert(finished.wait(self.TIMEOUT))
82 self.assert_node_properties_updated()
83 self.api_client.nodes().create.called_with(body={}, assign_slot=True)
84 self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
85 self.assertEqual(self.cloud_client.create_node(),
86 self.setup_actor.cloud_node.get(self.TIMEOUT))
88 def test_failed_arvados_calls_retried(self):
90 arverror.ApiError(httplib2.Response({'status': '500'}), ""),
91 testutil.arvados_node_mock(),
94 self.wait_for_assignment(self.setup_actor, 'arvados_node')
96 def test_failed_cloud_calls_retried(self):
98 self.cloud_client.create_node.side_effect = [
99 Exception("test cloud creation error"),
100 self.cloud_client.create_node.return_value,
103 self.wait_for_assignment(self.setup_actor, 'cloud_node')
105 def test_basehttperror_retried(self):
107 self.cloud_client.create_node.side_effect = [
108 BaseHTTPError(500, "Try again"),
109 self.cloud_client.create_node.return_value,
112 self.wait_for_assignment(self.setup_actor, 'cloud_node')
113 self.setup_actor.ping().get(self.TIMEOUT)
114 self.assertEqual(1, self.cloud_client.post_create_node.call_count)
116 def test_instance_exceeded_not_retried(self):
118 self.cloud_client.create_node.side_effect = [
119 BaseHTTPError(400, "InstanceLimitExceeded"),
120 self.cloud_client.create_node.return_value,
123 done = self.FUTURE_CLASS()
124 self.setup_actor.subscribe(done.set)
125 done.get(self.TIMEOUT)
126 self.assertEqual(0, self.cloud_client.post_create_node.call_count)
128 def test_failed_post_create_retried(self):
130 self.cloud_client.post_create_node.side_effect = [
131 Exception("test cloud post-create error"), None]
133 done = self.FUTURE_CLASS()
134 self.setup_actor.subscribe(done.set)
135 done.get(self.TIMEOUT)
136 self.assertEqual(2, self.cloud_client.post_create_node.call_count)
138 def test_stop_when_no_cloud_node(self):
140 arverror.ApiError(httplib2.Response({'status': '500'}), ""))
143 self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT))
145 self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
147 def test_no_stop_when_cloud_node(self):
149 self.wait_for_assignment(self.setup_actor, 'cloud_node')
151 self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT))
152 self.assertTrue(self.stop_proxy(self.setup_actor),
153 "actor was stopped by stop_if_no_cloud_node")
155 def test_subscribe(self):
157 arverror.ApiError(httplib2.Response({'status': '500'}), ""))
159 subscriber = mock.Mock(name='subscriber_mock')
160 self.setup_actor.subscribe(subscriber)
161 retry_resp = [testutil.arvados_node_mock()]
162 self.api_client.nodes().create().execute.side_effect = retry_resp
163 self.api_client.nodes().update().execute.side_effect = retry_resp
164 self.wait_for_assignment(self.setup_actor, 'cloud_node')
165 self.setup_actor.ping().get(self.TIMEOUT)
166 self.assertEqual(self.setup_actor.actor_ref.actor_urn,
167 subscriber.call_args[0][0].actor_ref.actor_urn)
169 def test_late_subscribe(self):
171 subscriber = mock.Mock(name='subscriber_mock')
172 self.wait_for_assignment(self.setup_actor, 'cloud_node')
173 self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
174 self.stop_proxy(self.setup_actor)
175 self.assertEqual(self.setup_actor.actor_ref.actor_urn,
176 subscriber.call_args[0][0].actor_ref.actor_urn)
179 class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
180 def make_mocks(self, cloud_node=None, arvados_node=None,
181 shutdown_open=True, node_broken=False):
182 self.timer = testutil.MockTimer()
183 self.shutdowns = testutil.MockShutdownTimer()
184 self.shutdowns._set_state(shutdown_open, 300)
185 self.cloud_client = mock.MagicMock(name='cloud_client')
186 self.cloud_client.broken.return_value = node_broken
187 self.arvados_client = mock.MagicMock(name='arvados_client')
188 self.updates = mock.MagicMock(name='update_mock')
189 if cloud_node is None:
190 cloud_node = testutil.cloud_node_mock()
191 self.cloud_node = cloud_node
192 self.arvados_node = arvados_node
194 def make_actor(self, cancellable=True, start_time=None):
195 if not hasattr(self, 'timer'):
197 if start_time is None:
198 start_time = time.time()
199 monitor_actor = dispatch.ComputeNodeMonitorActor.start(
200 self.cloud_node, start_time, self.shutdowns,
201 self.timer, self.updates, self.cloud_client,
203 self.shutdown_actor = self.ACTOR_CLASS.start(
204 self.timer, self.cloud_client, self.arvados_client, monitor_actor,
206 self.monitor_actor = monitor_actor.proxy()
208 def check_success_flag(self, expected, allow_msg_count=1):
209 # allow_msg_count is the number of internal messages that may
210 # need to be handled for shutdown to finish.
211 for _ in range(1 + allow_msg_count):
212 last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
213 if last_flag is expected:
216 self.fail("success flag {} is not {}".format(last_flag, expected))
218 def test_boot_failure_counting(self, *mocks):
219 # A boot failure happens when a node transitions from unpaired to shutdown
220 status.tracker.update({'boot_failures': 0})
221 self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="unpaired"))
222 self.cloud_client.destroy_node.return_value = True
223 self.make_actor(cancellable=False)
224 self.check_success_flag(True, 2)
225 self.assertTrue(self.cloud_client.destroy_node.called)
226 self.assertEqual(1, status.tracker.get('boot_failures'))
228 def test_cancellable_shutdown(self, *mocks):
229 self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
230 self.cloud_client.destroy_node.return_value = True
231 self.make_actor(cancellable=True)
232 self.check_success_flag(False, 2)
233 self.assertFalse(self.cloud_client.destroy_node.called)
235 def test_uncancellable_shutdown(self, *mocks):
236 status.tracker.update({'boot_failures': 0})
237 self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
238 self.cloud_client.destroy_node.return_value = True
239 self.make_actor(cancellable=False)
240 self.check_success_flag(True, 4)
241 self.assertTrue(self.cloud_client.destroy_node.called)
242 # A normal shutdown shouldn't be counted as boot failure
243 self.assertEqual(0, status.tracker.get('boot_failures'))
245 def test_arvados_node_cleaned_after_shutdown(self, *mocks):
247 mocks[0].return_value = "drain\n"
248 cloud_node = testutil.cloud_node_mock(62)
249 arv_node = testutil.arvados_node_mock(62)
250 self.make_mocks(cloud_node, arv_node)
252 self.check_success_flag(True, 3)
253 update_mock = self.arvados_client.nodes().update
254 self.assertTrue(update_mock.called)
255 update_kwargs = update_mock.call_args_list[0][1]
256 self.assertEqual(arv_node['uuid'], update_kwargs.get('uuid'))
257 self.assertIn('body', update_kwargs)
258 for clear_key in ['slot_number', 'hostname', 'ip_address',
259 'first_ping_at', 'last_ping_at']:
260 self.assertIn(clear_key, update_kwargs['body'])
261 self.assertIsNone(update_kwargs['body'][clear_key])
262 self.assertTrue(update_mock().execute.called)
264 def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
266 mocks[0].return_value = "idle\n"
267 cloud_node = testutil.cloud_node_mock(61)
268 arv_node = testutil.arvados_node_mock(61)
269 self.make_mocks(cloud_node, arv_node, shutdown_open=False)
270 self.cloud_client.destroy_node.return_value = False
271 self.make_actor(cancellable=True)
272 self.shutdown_actor.cancel_shutdown("test")
273 self.shutdown_actor.ping().get(self.TIMEOUT)
274 self.check_success_flag(False, 2)
275 self.assertFalse(self.arvados_client.nodes().update.called)
278 class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
280 ACTOR_CLASS = dispatch.ComputeNodeShutdownActor
282 def test_easy_shutdown(self):
283 self.make_actor(start_time=0)
284 self.check_success_flag(True)
285 self.assertTrue(self.cloud_client.destroy_node.called)
287 def test_shutdown_cancelled_when_destroy_node_fails(self):
288 self.make_mocks(node_broken=True)
289 self.cloud_client.destroy_node.return_value = False
290 self.make_actor(start_time=0)
291 self.check_success_flag(False, 2)
292 self.assertEqual(1, self.cloud_client.destroy_node.call_count)
293 self.assertEqual(self.ACTOR_CLASS.DESTROY_FAILED,
294 self.shutdown_actor.cancel_reason.get(self.TIMEOUT))
296 def test_late_subscribe(self):
298 subscriber = mock.Mock(name='subscriber_mock')
299 self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
300 self.stop_proxy(self.shutdown_actor)
301 self.assertTrue(subscriber.called)
302 self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
303 subscriber.call_args[0][0].actor_ref.actor_urn)
306 class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
308 ACTOR_CLASS = dispatch.ComputeNodeUpdateActor
310 def make_actor(self):
311 self.driver = mock.MagicMock(name='driver_mock')
312 self.timer = mock.MagicMock(name='timer_mock')
313 self.updater = self.ACTOR_CLASS.start(self.driver, self.timer).proxy()
315 def test_node_sync(self, *args):
317 cloud_node = testutil.cloud_node_mock()
318 arv_node = testutil.arvados_node_mock()
319 self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
320 self.driver().sync_node.assert_called_with(cloud_node, arv_node)
323 def test_node_sync_error(self, *args):
325 cloud_node = testutil.cloud_node_mock()
326 arv_node = testutil.arvados_node_mock()
327 self.driver().sync_node.side_effect = (IOError, Exception, True)
328 self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
329 self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
330 self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
331 self.driver().sync_node.assert_called_with(cloud_node, arv_node)
333 class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
335 def make_mocks(self, node_num):
336 self.shutdowns = testutil.MockShutdownTimer()
337 self.shutdowns._set_state(False, 300)
338 self.timer = mock.MagicMock(name='timer_mock')
339 self.updates = mock.MagicMock(name='update_mock')
340 self.cloud_mock = testutil.cloud_node_mock(node_num)
341 self.subscriber = mock.Mock(name='subscriber_mock')
342 self.cloud_client = mock.MagicMock(name='cloud_client')
343 self.cloud_client.broken.return_value = False
345 def make_actor(self, node_num=1, arv_node=None, start_time=None):
346 if not hasattr(self, 'cloud_mock'):
347 self.make_mocks(node_num)
348 if start_time is None:
349 start_time = time.time()
350 self.node_actor = dispatch.ComputeNodeMonitorActor.start(
351 self.cloud_mock, start_time, self.shutdowns,
352 self.timer, self.updates, self.cloud_client,
353 arv_node, boot_fail_after=300).proxy()
354 self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
356 def node_state(self, *states):
357 return self.node_actor.in_state(*states).get(self.TIMEOUT)
359 def test_in_state_when_unpaired(self):
361 self.assertTrue(self.node_state('unpaired'))
363 def test_in_state_when_pairing_stale(self):
364 self.make_actor(arv_node=testutil.arvados_node_mock(
365 job_uuid=None, age=90000))
366 self.assertTrue(self.node_state('down'))
368 def test_in_state_when_no_state_available(self):
369 self.make_actor(arv_node=testutil.arvados_node_mock(
370 crunch_worker_state=None))
371 self.assertTrue(self.node_state('idle'))
373 def test_in_state_when_no_state_available_old(self):
374 self.make_actor(arv_node=testutil.arvados_node_mock(
375 crunch_worker_state=None, age=90000))
376 self.assertTrue(self.node_state('down'))
378 def test_in_idle_state(self):
379 idle_nodes_before = status.tracker._idle_nodes.keys()
380 self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
381 self.assertTrue(self.node_state('idle'))
382 self.assertFalse(self.node_state('busy'))
383 self.assertTrue(self.node_state('idle', 'busy'))
384 idle_nodes_after = status.tracker._idle_nodes.keys()
385 new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
386 # There should be 1 additional idle node
387 self.assertEqual(1, len(new_idle_nodes))
389 def test_in_busy_state(self):
390 idle_nodes_before = status.tracker._idle_nodes.keys()
391 self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
392 self.assertFalse(self.node_state('idle'))
393 self.assertTrue(self.node_state('busy'))
394 self.assertTrue(self.node_state('idle', 'busy'))
395 idle_nodes_after = status.tracker._idle_nodes.keys()
396 new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
397 # There shouldn't be any additional idle node
398 self.assertEqual(0, len(new_idle_nodes))
400 def test_init_shutdown_scheduling(self):
402 self.assertTrue(self.timer.schedule.called)
403 self.assertEqual(300, self.timer.schedule.call_args[0][0])
405 def test_shutdown_window_close_scheduling(self):
407 self.shutdowns._set_state(False, 600)
408 self.timer.schedule.reset_mock()
409 self.node_actor.consider_shutdown().get(self.TIMEOUT)
410 self.stop_proxy(self.node_actor)
411 self.assertTrue(self.timer.schedule.called)
412 self.assertEqual(600, self.timer.schedule.call_args[0][0])
413 self.assertFalse(self.subscriber.called)
415 def test_shutdown_subscription(self):
416 self.make_actor(start_time=0)
417 self.shutdowns._set_state(True, 600)
418 self.node_actor.consider_shutdown().get(self.TIMEOUT)
419 self.assertTrue(self.subscriber.called)
420 self.assertEqual(self.node_actor.actor_ref.actor_urn,
421 self.subscriber.call_args[0][0].actor_ref.actor_urn)
423 def test_no_shutdown_booting(self):
425 self.shutdowns._set_state(True, 600)
426 self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
427 (False, "node state is ('unpaired', 'open', 'boot wait', 'idle exceeded')"))
429 def test_shutdown_without_arvados_node(self):
430 self.make_actor(start_time=0)
431 self.shutdowns._set_state(True, 600)
432 self.assertEquals((True, "node state is ('down', 'open', 'boot exceeded', 'idle exceeded')"),
433 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
435 def test_shutdown_missing(self):
436 arv_node = testutil.arvados_node_mock(10, job_uuid=None,
437 crunch_worker_state="down",
438 last_ping_at='1970-01-01T01:02:03.04050607Z')
439 self.make_actor(10, arv_node)
440 self.shutdowns._set_state(True, 600)
441 self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
442 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
444 def test_shutdown_running_broken(self):
445 arv_node = testutil.arvados_node_mock(12, job_uuid=None,
446 crunch_worker_state="down")
447 self.make_actor(12, arv_node)
448 self.shutdowns._set_state(True, 600)
449 self.cloud_client.broken.return_value = True
450 self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
451 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
453 def test_shutdown_missing_broken(self):
454 arv_node = testutil.arvados_node_mock(11, job_uuid=None,
455 crunch_worker_state="down",
456 last_ping_at='1970-01-01T01:02:03.04050607Z')
457 self.make_actor(11, arv_node)
458 self.shutdowns._set_state(True, 600)
459 self.cloud_client.broken.return_value = True
460 self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
462 def test_no_shutdown_when_window_closed(self):
463 self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
464 self.assertEquals((False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"),
465 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
467 def test_no_shutdown_when_node_running_job(self):
468 self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
469 self.shutdowns._set_state(True, 600)
470 self.assertEquals((False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"),
471 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
473 def test_shutdown_when_node_state_unknown(self):
474 self.make_actor(5, testutil.arvados_node_mock(
475 5, crunch_worker_state=None))
476 self.shutdowns._set_state(True, 600)
477 self.assertEquals((True, "node state is ('idle', 'open', 'boot wait', 'idle exceeded')"),
478 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
480 def test_shutdown_when_node_state_fail(self):
481 self.make_actor(5, testutil.arvados_node_mock(
482 5, crunch_worker_state='fail'))
483 self.shutdowns._set_state(True, 600)
484 self.assertEquals((True, "node state is ('fail', 'open', 'boot wait', 'idle exceeded')"),
485 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
487 def test_no_shutdown_when_node_state_stale(self):
488 self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
489 self.shutdowns._set_state(True, 600)
490 self.assertEquals((False, "node state is stale"),
491 self.node_actor.shutdown_eligible().get(self.TIMEOUT))
493 def test_arvados_node_match(self):
495 arv_node = testutil.arvados_node_mock(
496 2, hostname='compute-two.zzzzz.arvadosapi.com')
497 self.cloud_client.node_id.return_value = '2'
498 pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
499 self.assertEqual(self.cloud_mock.id, pair_id)
500 self.stop_proxy(self.node_actor)
501 self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
503 def test_arvados_node_mismatch(self):
505 arv_node = testutil.arvados_node_mock(1)
507 self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
509 def test_arvados_node_mismatch_first_ping_too_early(self):
511 arv_node = testutil.arvados_node_mock(
512 4, first_ping_at='1971-03-02T14:15:16.1717282Z')
514 self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
516 def test_update_cloud_node(self):
519 self.cloud_mock.id = '1'
520 self.node_actor.update_cloud_node(self.cloud_mock)
521 current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
522 self.assertEqual([testutil.ip_address_mock(2)],
523 current_cloud.private_ips)
525 def test_missing_cloud_node_update(self):
527 self.node_actor.update_cloud_node(None)
528 current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
529 self.assertEqual([testutil.ip_address_mock(1)],
530 current_cloud.private_ips)
532 def test_update_arvados_node(self):
534 job_uuid = 'zzzzz-jjjjj-updatejobnode00'
535 new_arvados = testutil.arvados_node_mock(3, job_uuid)
536 self.node_actor.update_arvados_node(new_arvados)
537 current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
538 self.assertEqual(job_uuid, current_arvados['job_uuid'])
540 def test_missing_arvados_node_update(self):
541 self.make_actor(4, testutil.arvados_node_mock(4))
542 self.node_actor.update_arvados_node(None)
543 current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
544 self.assertEqual(testutil.ip_address_mock(4),
545 current_arvados['ip_address'])
547 def test_update_arvados_node_calls_sync_node(self):
549 self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
551 arv_node = testutil.arvados_node_mock(5)
552 self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
553 self.assertEqual(1, self.updates.sync_node.call_count)