3 from __future__ import absolute_import, print_function
8 import arvados.errors as arverror
13 import arvnodeman.computenode.dispatch as dispatch
14 from . import testutil
16 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
17 def make_mocks(self, arvados_effect=None, cloud_effect=None):
18 if arvados_effect is None:
19 arvados_effect = [testutil.arvados_node_mock()]
20 self.arvados_effect = arvados_effect
21 self.timer = testutil.MockTimer()
22 self.api_client = mock.MagicMock(name='api_client')
23 self.api_client.nodes().create().execute.side_effect = arvados_effect
24 self.api_client.nodes().update().execute.side_effect = arvados_effect
25 self.cloud_client = mock.MagicMock(name='cloud_client')
26 self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
28 def make_actor(self, arv_node=None):
29 if not hasattr(self, 'timer'):
30 self.make_mocks(arvados_effect=[arv_node])
31 self.setup_actor = dispatch.ComputeNodeSetupActor.start(
32 self.timer, self.api_client, self.cloud_client,
33 testutil.MockSize(1), arv_node).proxy()
35 def test_creation_without_arvados_node(self):
37 self.assertEqual(self.arvados_effect[-1],
38 self.setup_actor.arvados_node.get(self.TIMEOUT))
39 self.assertTrue(self.api_client.nodes().create().execute.called)
40 self.assertEqual(self.cloud_client.create_node(),
41 self.setup_actor.cloud_node.get(self.TIMEOUT))
43 def test_creation_with_arvados_node(self):
44 self.make_actor(testutil.arvados_node_mock())
45 self.assertEqual(self.arvados_effect[-1],
46 self.setup_actor.arvados_node.get(self.TIMEOUT))
47 self.assertTrue(self.api_client.nodes().update().execute.called)
48 self.assertEqual(self.cloud_client.create_node(),
49 self.setup_actor.cloud_node.get(self.TIMEOUT))
51 def test_failed_calls_retried(self):
53 arverror.ApiError(httplib2.Response({'status': '500'}), ""),
54 testutil.arvados_node_mock(),
57 self.wait_for_assignment(self.setup_actor, 'cloud_node')
59 def test_stop_when_no_cloud_node(self):
61 arverror.ApiError(httplib2.Response({'status': '500'}), ""))
63 self.setup_actor.stop_if_no_cloud_node()
65 self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
67 def test_no_stop_when_cloud_node(self):
69 self.wait_for_assignment(self.setup_actor, 'cloud_node')
70 self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
71 self.assertTrue(self.stop_proxy(self.setup_actor),
72 "actor was stopped by stop_if_no_cloud_node")
74 def test_subscribe(self):
76 arverror.ApiError(httplib2.Response({'status': '500'}), ""))
78 subscriber = mock.Mock(name='subscriber_mock')
79 self.setup_actor.subscribe(subscriber)
80 self.api_client.nodes().create().execute.side_effect = [
81 testutil.arvados_node_mock()]
82 self.wait_for_assignment(self.setup_actor, 'cloud_node')
83 self.assertEqual(self.setup_actor.actor_ref.actor_urn,
84 subscriber.call_args[0][0].actor_ref.actor_urn)
86 def test_late_subscribe(self):
88 subscriber = mock.Mock(name='subscriber_mock')
89 self.wait_for_assignment(self.setup_actor, 'cloud_node')
90 self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
91 self.stop_proxy(self.setup_actor)
92 self.assertEqual(self.setup_actor.actor_ref.actor_urn,
93 subscriber.call_args[0][0].actor_ref.actor_urn)
96 class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
98 def make_mocks(self, cloud_node=None):
99 self.timer = testutil.MockTimer()
100 self.cloud_client = mock.MagicMock(name='cloud_client')
101 if cloud_node is None:
102 cloud_node = testutil.cloud_node_mock()
103 self.cloud_node = cloud_node
105 def make_actor(self, arv_node=None):
106 if not hasattr(self, 'timer'):
108 self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
109 self.timer, self.cloud_client, self.cloud_node).proxy()
111 def test_easy_shutdown(self):
113 self.shutdown_actor.cloud_node.get(self.TIMEOUT)
114 self.stop_proxy(self.shutdown_actor)
115 self.assertTrue(self.cloud_client.destroy_node.called)
117 def test_late_subscribe(self):
119 subscriber = mock.Mock(name='subscriber_mock')
120 self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
121 self.stop_proxy(self.shutdown_actor)
122 self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
123 subscriber.call_args[0][0].actor_ref.actor_urn)
126 class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
128 def make_actor(self):
129 self.driver = mock.MagicMock(name='driver_mock')
130 self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
132 def test_node_sync(self):
134 cloud_node = testutil.cloud_node_mock()
135 arv_node = testutil.arvados_node_mock()
136 self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
137 self.driver().sync_node.assert_called_with(cloud_node, arv_node)
140 class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
142 class MockShutdownTimer(object):
143 def _set_state(self, is_open, next_opening):
144 self.window_open = lambda: is_open
145 self.next_opening = lambda: next_opening
148 def make_mocks(self, node_num):
149 self.shutdowns = self.MockShutdownTimer()
150 self.shutdowns._set_state(False, 300)
151 self.timer = mock.MagicMock(name='timer_mock')
152 self.updates = mock.MagicMock(name='update_mock')
153 self.cloud_mock = testutil.cloud_node_mock(node_num)
154 self.subscriber = mock.Mock(name='subscriber_mock')
156 def make_actor(self, node_num=1, arv_node=None, start_time=None):
157 if not hasattr(self, 'cloud_mock'):
158 self.make_mocks(node_num)
159 if start_time is None:
160 start_time = time.time()
161 self.node_actor = dispatch.ComputeNodeMonitorActor.start(
162 self.cloud_mock, start_time, self.shutdowns, self.timer,
163 self.updates, arv_node).proxy()
164 self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
166 def node_state(self, *states):
167 return self.node_actor.in_state(*states).get(self.TIMEOUT)
169 def test_in_state_when_unpaired(self):
171 self.assertIsNone(self.node_state('idle', 'alloc'))
173 def test_in_state_when_pairing_stale(self):
174 self.make_actor(arv_node=testutil.arvados_node_mock(
175 job_uuid=None, age=90000))
176 self.assertIsNone(self.node_state('idle', 'alloc'))
178 def test_in_state_when_no_state_available(self):
179 self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
180 self.assertIsNone(self.node_state('idle', 'alloc'))
182 def test_in_idle_state(self):
183 self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
184 self.assertTrue(self.node_state('idle'))
185 self.assertFalse(self.node_state('alloc'))
186 self.assertTrue(self.node_state('idle', 'alloc'))
188 def test_in_alloc_state(self):
189 self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
190 self.assertFalse(self.node_state('idle'))
191 self.assertTrue(self.node_state('alloc'))
192 self.assertTrue(self.node_state('idle', 'alloc'))
194 def test_init_shutdown_scheduling(self):
196 self.assertTrue(self.timer.schedule.called)
197 self.assertEqual(300, self.timer.schedule.call_args[0][0])
199 def test_shutdown_subscription(self):
201 self.shutdowns._set_state(True, 600)
202 self.node_actor.consider_shutdown().get(self.TIMEOUT)
203 self.assertTrue(self.subscriber.called)
204 self.assertEqual(self.node_actor.actor_ref.actor_urn,
205 self.subscriber.call_args[0][0].actor_ref.actor_urn)
207 def test_shutdown_without_arvados_node(self):
209 self.shutdowns._set_state(True, 600)
210 self.node_actor.consider_shutdown().get(self.TIMEOUT)
211 self.assertTrue(self.subscriber.called)
213 def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
214 self.make_actor(start_time=0)
215 self.shutdowns._set_state(True, 600)
216 self.node_actor.consider_shutdown().get(self.TIMEOUT)
217 self.assertFalse(self.subscriber.called)
219 def check_shutdown_rescheduled(self, window_open, next_window,
221 self.shutdowns._set_state(window_open, next_window)
222 self.timer.schedule.reset_mock()
223 self.node_actor.consider_shutdown().get(self.TIMEOUT)
224 self.stop_proxy(self.node_actor)
225 self.assertTrue(self.timer.schedule.called)
226 if schedule_time is not None:
227 self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
228 self.assertFalse(self.subscriber.called)
230 def test_shutdown_window_close_scheduling(self):
232 self.check_shutdown_rescheduled(False, 600, 600)
234 def test_no_shutdown_when_node_running_job(self):
235 self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
236 self.check_shutdown_rescheduled(True, 600)
238 def test_no_shutdown_when_node_state_unknown(self):
239 self.make_actor(5, testutil.arvados_node_mock(5, info={}))
240 self.check_shutdown_rescheduled(True, 600)
242 def test_no_shutdown_when_node_state_stale(self):
243 self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
244 self.check_shutdown_rescheduled(True, 600)
246 def test_arvados_node_match(self):
248 arv_node = testutil.arvados_node_mock(
249 2, hostname='compute-two.zzzzz.arvadosapi.com')
250 pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
251 self.assertEqual(self.cloud_mock.id, pair_id)
252 self.stop_proxy(self.node_actor)
253 self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
255 def test_arvados_node_mismatch(self):
257 arv_node = testutil.arvados_node_mock(1)
259 self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
261 def test_update_cloud_node(self):
264 self.cloud_mock.id = '1'
265 self.node_actor.update_cloud_node(self.cloud_mock)
266 current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
267 self.assertEqual([testutil.ip_address_mock(2)],
268 current_cloud.private_ips)
270 def test_missing_cloud_node_update(self):
272 self.node_actor.update_cloud_node(None)
273 current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
274 self.assertEqual([testutil.ip_address_mock(1)],
275 current_cloud.private_ips)
277 def test_update_arvados_node(self):
279 job_uuid = 'zzzzz-jjjjj-updatejobnode00'
280 new_arvados = testutil.arvados_node_mock(3, job_uuid)
281 self.node_actor.update_arvados_node(new_arvados)
282 current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
283 self.assertEqual(job_uuid, current_arvados['job_uuid'])
285 def test_missing_arvados_node_update(self):
286 self.make_actor(4, testutil.arvados_node_mock(4))
287 self.node_actor.update_arvados_node(None)
288 current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
289 self.assertEqual(testutil.ip_address_mock(4),
290 current_arvados['ip_address'])