2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
16 import libcloud.common.types as cloud_types
18 from . import pykka_timeout
20 no_sleep = mock.patch('time.sleep', lambda n: None)
22 def arvados_node_mock(node_num=99, job_uuid=None, age=-1, **kwargs):
23 mod_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=age)
24 mod_time_s = mod_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
26 job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
27 crunch_worker_state = 'idle' if (job_uuid is None) else 'busy'
28 node = {'uuid': 'zzzzz-yyyyy-{:015x}'.format(node_num),
29 'created_at': '2014-01-01T01:02:03.04050607Z',
30 'modified_at': mod_time_s,
31 'first_ping_at': kwargs.pop('first_ping_at', mod_time_s),
32 'last_ping_at': mod_time_s,
33 'slot_number': node_num,
34 'hostname': 'compute{}'.format(node_num),
35 'domain': 'zzzzz.arvadosapi.com',
36 'ip_address': ip_address_mock(node_num),
38 'crunch_worker_state': crunch_worker_state,
40 'info': {'ping_secret': 'defaulttestsecret', 'ec2_instance_id': str(node_num)}}
44 def cloud_object_mock(name_id, **extra):
45 # A very generic mock, useful for stubbing libcloud objects we
46 # only search for and pass around, like locations, subnets, etc.
47 cloud_object = mock.NonCallableMagicMock(['id', 'name'],
49 cloud_object.name = str(name_id)
50 cloud_object.id = 'id_' + cloud_object.name
51 cloud_object.extra = extra
55 def cloud_node_fqdn(node):
56 # We intentionally put the FQDN somewhere goofy to make sure tested code is
57 # using this function for lookups.
58 return node.extra.get('testname', node.name+'.NoTestName.invalid')
60 def ip_address_mock(last_octet):
61 return '10.20.30.{}'.format(last_octet)
63 @contextlib.contextmanager
64 def redirected_streams(stdout=None, stderr=None):
65 orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
66 orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
70 sys.stdout = orig_stdout
71 sys.stderr = orig_stderr
74 class MockShutdownTimer(object):
75 def _set_state(self, is_open, next_opening):
76 self.window_open = lambda: is_open
77 self.next_opening = lambda: next_opening
80 class MockSize(object):
81 def __init__(self, factor, preemptible=False):
82 self.id = 'z{}.test'.format(factor)
83 self.name = 'test size '+self.id
84 self.ram = 128 * factor
85 self.disk = factor # GB
86 self.scratch = 1000 * factor # MB
87 self.bandwidth = 16 * factor
88 self.price = float(factor)
91 self.preemptible = preemptible
93 def __eq__(self, other):
94 return self.id == other.id
97 class MockTimer(object):
98 def __init__(self, deliver_immediately=True):
99 self.deliver_immediately = deliver_immediately
101 self.lock = threading.Lock()
105 to_deliver = self.messages
107 for callback, args, kwargs in to_deliver:
109 callback(*args, **kwargs)
110 except pykka.ActorDeadError:
113 def schedule(self, want_time, callback, *args, **kwargs):
115 self.messages.append((callback, args, kwargs))
116 if self.deliver_immediately:
120 class ActorTestMixin(object):
121 FUTURE_CLASS = pykka.ThreadingFuture
122 TIMEOUT = pykka_timeout
125 pykka.ActorRegistry.stop_all()
127 def stop_proxy(self, proxy):
128 th = proxy.get_thread().get()
129 t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
133 def wait_for_assignment(self, proxy, attr_name, unassigned=None,
135 deadline = time.time() + timeout
137 loop_timeout = deadline - time.time()
138 if loop_timeout <= 0:
139 self.fail("actor did not assign {} in time".format(attr_name))
140 result = getattr(proxy, attr_name).get(loop_timeout)
141 if result is not unassigned:
144 def busywait(self, f, finalize=None):
146 while not f() and n < 20:
149 if finalize is not None:
154 class DriverTestMixin(object):
156 self.driver_mock = mock.MagicMock(name='driver_mock')
157 super(DriverTestMixin, self).setUp()
159 def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
160 create_kwargs.setdefault('ping_host', '100::')
161 return self.TEST_CLASS(
162 auth_kwargs, list_kwargs, create_kwargs,
163 driver_class=self.driver_mock)
165 def driver_method_args(self, method_name):
166 return getattr(self.driver_mock(), method_name).call_args
168 def test_driver_create_retry(self):
169 with mock.patch('time.sleep'):
170 driver_mock2 = mock.MagicMock(name='driver_mock2')
171 self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
172 kwargs = {'user_id': 'foo'}
173 driver = self.new_driver(auth_kwargs=kwargs)
174 self.assertTrue(self.driver_mock.called)
175 self.assertIs(driver.real, driver_mock2)
177 def test_create_can_find_node_after_timeout(self, create_kwargs={}, node_extra={}):
178 driver = self.new_driver(create_kwargs=create_kwargs)
179 arv_node = arvados_node_mock()
180 cloud_node = cloud_node_mock(**node_extra)
181 cloud_node.name = driver.create_cloud_name(arv_node)
182 create_method = self.driver_mock().create_node
183 create_method.side_effect = cloud_types.LibcloudError("fake timeout")
184 list_method = self.driver_mock().list_nodes
185 list_method.return_value = [cloud_node]
186 actual = driver.create_node(MockSize(1), arv_node)
187 self.assertIs(cloud_node, actual)
189 def test_create_can_raise_exception_after_timeout(self):
190 driver = self.new_driver()
191 arv_node = arvados_node_mock()
192 create_method = self.driver_mock().create_node
193 create_method.side_effect = cloud_types.LibcloudError("fake timeout")
194 list_method = self.driver_mock().list_nodes
195 list_method.return_value = []
196 with self.assertRaises(cloud_types.LibcloudError) as exc_test:
197 driver.create_node(MockSize(1), arv_node)
198 self.assertIs(create_method.side_effect, exc_test.exception)
200 def check_node_found_after_timeout_has_fixed_size(self, size, cloud_node,
202 # This method needs to be called explicitly by driver test suites
204 self.driver_mock().list_sizes.return_value = [size]
205 driver = self.new_driver(create_kwargs=create_kwargs)
206 arv_node = arvados_node_mock()
207 cloud_node.name = driver.create_cloud_name(arv_node)
208 create_method = self.driver_mock().create_node
209 create_method.side_effect = cloud_types.LibcloudError("fake timeout")
210 self.driver_mock().list_nodes.return_value = [cloud_node]
211 actual = driver.create_node(size, arv_node)
212 self.assertIs(size, actual.size)
215 class RemotePollLoopActorTestMixin(ActorTestMixin):
216 def build_monitor(self, *args, **kwargs):
217 self.timer = mock.MagicMock(name='timer_mock')
218 self.client = mock.MagicMock(name='client_mock')
219 self.subscriber = mock.Mock(name='subscriber_mock')
220 self.monitor = self.TEST_CLASS.start(
221 self.client, self.timer, *args, **kwargs).proxy()
223 def cloud_node_mock(node_num=99, size=None, **extra):
225 size = MockSize(node_num)
226 node = mock.NonCallableMagicMock(
227 ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
230 node.id = str(node_num)
234 node.private_ips = [ip_address_mock(node_num)]