6e134375bb8aec05bdd71f830e28f277d3cff5b5
[arvados.git] / services / nodemanager / tests / testutil.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import contextlib
9 import datetime
10 import mock
11 import pykka
12 import sys
13 import threading
14 import time
15
16 import libcloud.common.types as cloud_types
17
18 from . import pykka_timeout
19
20 no_sleep = mock.patch('time.sleep', lambda n: None)
21
22 def arvados_node_mock(node_num=99, job_uuid=None, age=-1, **kwargs):
23     mod_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=age)
24     mod_time_s = mod_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
25     if job_uuid is True:
26         job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
27     crunch_worker_state = 'idle' if (job_uuid is None) else 'busy'
28     node = {'uuid': 'zzzzz-yyyyy-{:015x}'.format(node_num),
29             'created_at': '2014-01-01T01:02:03.04050607Z',
30             'modified_at': mod_time_s,
31             'first_ping_at': kwargs.pop('first_ping_at', mod_time_s),
32             'last_ping_at': mod_time_s,
33             'slot_number': node_num,
34             'hostname': 'compute{}'.format(node_num),
35             'domain': 'zzzzz.arvadosapi.com',
36             'ip_address': ip_address_mock(node_num),
37             'job_uuid': job_uuid,
38             'crunch_worker_state': crunch_worker_state,
39             'properties': {},
40             'info': {'ping_secret': 'defaulttestsecret', 'ec2_instance_id': str(node_num)}}
41     node.update(kwargs)
42     return node
43
44 def cloud_object_mock(name_id, **extra):
45     # A very generic mock, useful for stubbing libcloud objects we
46     # only search for and pass around, like locations, subnets, etc.
47     cloud_object = mock.NonCallableMagicMock(['id', 'name'],
48                                              name='cloud_object')
49     cloud_object.name = str(name_id)
50     cloud_object.id = 'id_' + cloud_object.name
51     cloud_object.extra = extra
52     return cloud_object
53
54
55 def cloud_node_fqdn(node):
56     # We intentionally put the FQDN somewhere goofy to make sure tested code is
57     # using this function for lookups.
58     return node.extra.get('testname', 'NoTestName')
59
60 def ip_address_mock(last_octet):
61     return '10.20.30.{}'.format(last_octet)
62
63 @contextlib.contextmanager
64 def redirected_streams(stdout=None, stderr=None):
65     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
66     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
67     try:
68         yield
69     finally:
70         sys.stdout = orig_stdout
71         sys.stderr = orig_stderr
72
73
74 class MockShutdownTimer(object):
75     def _set_state(self, is_open, next_opening):
76         self.window_open = lambda: is_open
77         self.next_opening = lambda: next_opening
78
79
80 class MockSize(object):
81     def __init__(self, factor):
82         self.id = 'z{}.test'.format(factor)
83         self.name = self.id
84         self.ram = 128 * factor
85         self.disk = factor   # GB
86         self.scratch = 1000 * factor # MB
87         self.bandwidth = 16 * factor
88         self.price = float(factor)
89         self.extra = {}
90
91     def __eq__(self, other):
92         return self.id == other.id
93
94
95 class MockTimer(object):
96     def __init__(self, deliver_immediately=True):
97         self.deliver_immediately = deliver_immediately
98         self.messages = []
99         self.lock = threading.Lock()
100
101     def deliver(self):
102         with self.lock:
103             to_deliver = self.messages
104             self.messages = []
105         for callback, args, kwargs in to_deliver:
106             try:
107                 callback(*args, **kwargs)
108             except pykka.ActorDeadError:
109                 pass
110
111     def schedule(self, want_time, callback, *args, **kwargs):
112         with self.lock:
113             self.messages.append((callback, args, kwargs))
114         if self.deliver_immediately:
115             self.deliver()
116
117
118 class ActorTestMixin(object):
119     FUTURE_CLASS = pykka.ThreadingFuture
120     TIMEOUT = pykka_timeout
121
122     def tearDown(self):
123         pykka.ActorRegistry.stop_all()
124
125     def stop_proxy(self, proxy):
126         th = proxy.get_thread().get()
127         t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
128         th.join()
129         return t
130
131     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
132                             timeout=TIMEOUT):
133         deadline = time.time() + timeout
134         while True:
135             loop_timeout = deadline - time.time()
136             if loop_timeout <= 0:
137                 self.fail("actor did not assign {} in time".format(attr_name))
138             result = getattr(proxy, attr_name).get(loop_timeout)
139             if result is not unassigned:
140                 return result
141
142     def busywait(self, f, finalize=None):
143         n = 0
144         while not f() and n < 20:
145             time.sleep(.1)
146             n += 1
147         if finalize is not None:
148             finalize()
149         self.assertTrue(f())
150
151
152 class DriverTestMixin(object):
153     def setUp(self):
154         self.driver_mock = mock.MagicMock(name='driver_mock')
155         super(DriverTestMixin, self).setUp()
156
157     def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
158         create_kwargs.setdefault('ping_host', '100::')
159         return self.TEST_CLASS(
160             auth_kwargs, list_kwargs, create_kwargs,
161             driver_class=self.driver_mock)
162
163     def driver_method_args(self, method_name):
164         return getattr(self.driver_mock(), method_name).call_args
165
166     def test_driver_create_retry(self):
167         with mock.patch('time.sleep'):
168             driver_mock2 = mock.MagicMock(name='driver_mock2')
169             self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
170             kwargs = {'user_id': 'foo'}
171             driver = self.new_driver(auth_kwargs=kwargs)
172             self.assertTrue(self.driver_mock.called)
173             self.assertIs(driver.real, driver_mock2)
174
175     def test_create_can_find_node_after_timeout(self, create_kwargs={}, node_extra={}):
176         driver = self.new_driver(create_kwargs=create_kwargs)
177         arv_node = arvados_node_mock()
178         cloud_node = cloud_node_mock(**node_extra)
179         cloud_node.name = driver.create_cloud_name(arv_node)
180         create_method = self.driver_mock().create_node
181         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
182         list_method = self.driver_mock().list_nodes
183         list_method.return_value = [cloud_node]
184         actual = driver.create_node(MockSize(1), arv_node)
185         self.assertIs(cloud_node, actual)
186
187     def test_create_can_raise_exception_after_timeout(self):
188         driver = self.new_driver()
189         arv_node = arvados_node_mock()
190         create_method = self.driver_mock().create_node
191         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
192         list_method = self.driver_mock().list_nodes
193         list_method.return_value = []
194         with self.assertRaises(cloud_types.LibcloudError) as exc_test:
195             driver.create_node(MockSize(1), arv_node)
196         self.assertIs(create_method.side_effect, exc_test.exception)
197
198     def check_node_found_after_timeout_has_fixed_size(self, size, cloud_node,
199                                                       create_kwargs={}):
200         # This method needs to be called explicitly by driver test suites
201         # that need it.
202         self.driver_mock().list_sizes.return_value = [size]
203         driver = self.new_driver(create_kwargs=create_kwargs)
204         arv_node = arvados_node_mock()
205         cloud_node.name = driver.create_cloud_name(arv_node)
206         create_method = self.driver_mock().create_node
207         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
208         self.driver_mock().list_nodes.return_value = [cloud_node]
209         actual = driver.create_node(size, arv_node)
210         self.assertIs(size, actual.size)
211
212
213 class RemotePollLoopActorTestMixin(ActorTestMixin):
214     def build_monitor(self, *args, **kwargs):
215         self.timer = mock.MagicMock(name='timer_mock')
216         self.client = mock.MagicMock(name='client_mock')
217         self.subscriber = mock.Mock(name='subscriber_mock')
218         self.monitor = self.TEST_CLASS.start(
219             self.client, self.timer, *args, **kwargs).proxy()
220
221 def cloud_node_mock(node_num=99, size=None, **extra):
222     if size is None:
223         size = MockSize(node_num)
224     node = mock.NonCallableMagicMock(
225         ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
226          'image', 'extra'],
227         name='cloud_node')
228     node.id = str(node_num)
229     node.name = node.id
230     node.size = size
231     node.public_ips = []
232     node.private_ips = [ip_address_mock(node_num)]
233     node.extra = extra
234     return node