10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / services / nodemanager / tests / testutil.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import contextlib
6 import datetime
7 import mock
8 import pykka
9 import sys
10 import threading
11 import time
12
13 import libcloud.common.types as cloud_types
14
15 from . import pykka_timeout
16
17 no_sleep = mock.patch('time.sleep', lambda n: None)
18
19 def arvados_node_mock(node_num=99, job_uuid=None, age=-1, **kwargs):
20     mod_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=age)
21     mod_time_s = mod_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
22     if job_uuid is True:
23         job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
24     crunch_worker_state = 'idle' if (job_uuid is None) else 'busy'
25     node = {'uuid': 'zzzzz-yyyyy-{:015x}'.format(node_num),
26             'created_at': '2014-01-01T01:02:03.04050607Z',
27             'modified_at': mod_time_s,
28             'first_ping_at': kwargs.pop('first_ping_at', mod_time_s),
29             'last_ping_at': mod_time_s,
30             'slot_number': node_num,
31             'hostname': 'compute{}'.format(node_num),
32             'domain': 'zzzzz.arvadosapi.com',
33             'ip_address': ip_address_mock(node_num),
34             'job_uuid': job_uuid,
35             'crunch_worker_state': crunch_worker_state,
36             'properties': {},
37             'info': {'ping_secret': 'defaulttestsecret', 'ec2_instance_id': str(node_num)}}
38     node.update(kwargs)
39     return node
40
41 def cloud_object_mock(name_id, **extra):
42     # A very generic mock, useful for stubbing libcloud objects we
43     # only search for and pass around, like locations, subnets, etc.
44     cloud_object = mock.NonCallableMagicMock(['id', 'name'],
45                                              name='cloud_object')
46     cloud_object.name = str(name_id)
47     cloud_object.id = 'id_' + cloud_object.name
48     cloud_object.extra = extra
49     return cloud_object
50
51
52 def cloud_node_fqdn(node):
53     # We intentionally put the FQDN somewhere goofy to make sure tested code is
54     # using this function for lookups.
55     return node.extra.get('testname', 'NoTestName')
56
57 def ip_address_mock(last_octet):
58     return '10.20.30.{}'.format(last_octet)
59
60 @contextlib.contextmanager
61 def redirected_streams(stdout=None, stderr=None):
62     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
63     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
64     try:
65         yield
66     finally:
67         sys.stdout = orig_stdout
68         sys.stderr = orig_stderr
69
70
71 class MockShutdownTimer(object):
72     def _set_state(self, is_open, next_opening):
73         self.window_open = lambda: is_open
74         self.next_opening = lambda: next_opening
75
76
77 class MockSize(object):
78     def __init__(self, factor):
79         self.id = 'z{}.test'.format(factor)
80         self.name = self.id
81         self.ram = 128 * factor
82         self.disk = 100 * factor
83         self.bandwidth = 16 * factor
84         self.price = float(factor)
85         self.extra = {}
86
87     def __eq__(self, other):
88         return self.id == other.id
89
90
91 class MockTimer(object):
92     def __init__(self, deliver_immediately=True):
93         self.deliver_immediately = deliver_immediately
94         self.messages = []
95         self.lock = threading.Lock()
96
97     def deliver(self):
98         with self.lock:
99             to_deliver = self.messages
100             self.messages = []
101         for callback, args, kwargs in to_deliver:
102             try:
103                 callback(*args, **kwargs)
104             except pykka.ActorDeadError:
105                 pass
106
107     def schedule(self, want_time, callback, *args, **kwargs):
108         with self.lock:
109             self.messages.append((callback, args, kwargs))
110         if self.deliver_immediately:
111             self.deliver()
112
113
114 class ActorTestMixin(object):
115     FUTURE_CLASS = pykka.ThreadingFuture
116     TIMEOUT = pykka_timeout
117
118     def tearDown(self):
119         pykka.ActorRegistry.stop_all()
120
121     def stop_proxy(self, proxy):
122         return proxy.actor_ref.stop(timeout=self.TIMEOUT)
123
124     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
125                             timeout=TIMEOUT):
126         deadline = time.time() + timeout
127         while True:
128             loop_timeout = deadline - time.time()
129             if loop_timeout <= 0:
130                 self.fail("actor did not assign {} in time".format(attr_name))
131             result = getattr(proxy, attr_name).get(loop_timeout)
132             if result is not unassigned:
133                 return result
134
135     def busywait(self, f):
136         n = 0
137         while not f() and n < 10:
138             time.sleep(.1)
139             n += 1
140         self.assertTrue(f())
141
142
143 class DriverTestMixin(object):
144     def setUp(self):
145         self.driver_mock = mock.MagicMock(name='driver_mock')
146         super(DriverTestMixin, self).setUp()
147
148     def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
149         create_kwargs.setdefault('ping_host', '100::')
150         return self.TEST_CLASS(
151             auth_kwargs, list_kwargs, create_kwargs,
152             driver_class=self.driver_mock)
153
154     def driver_method_args(self, method_name):
155         return getattr(self.driver_mock(), method_name).call_args
156
157     def test_driver_create_retry(self):
158         with mock.patch('time.sleep'):
159             driver_mock2 = mock.MagicMock(name='driver_mock2')
160             self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
161             kwargs = {'user_id': 'foo'}
162             driver = self.new_driver(auth_kwargs=kwargs)
163             self.assertTrue(self.driver_mock.called)
164             self.assertIs(driver.real, driver_mock2)
165
166     def test_create_can_find_node_after_timeout(self, create_kwargs={}, node_extra={}):
167         driver = self.new_driver(create_kwargs=create_kwargs)
168         arv_node = arvados_node_mock()
169         cloud_node = cloud_node_mock(**node_extra)
170         cloud_node.name = driver.create_cloud_name(arv_node)
171         create_method = self.driver_mock().create_node
172         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
173         list_method = self.driver_mock().list_nodes
174         list_method.return_value = [cloud_node]
175         actual = driver.create_node(MockSize(1), arv_node)
176         self.assertIs(cloud_node, actual)
177
178     def test_create_can_raise_exception_after_timeout(self):
179         driver = self.new_driver()
180         arv_node = arvados_node_mock()
181         create_method = self.driver_mock().create_node
182         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
183         list_method = self.driver_mock().list_nodes
184         list_method.return_value = []
185         with self.assertRaises(cloud_types.LibcloudError) as exc_test:
186             driver.create_node(MockSize(1), arv_node)
187         self.assertIs(create_method.side_effect, exc_test.exception)
188
189     def check_node_found_after_timeout_has_fixed_size(self, size, cloud_node,
190                                                       create_kwargs={}):
191         # This method needs to be called explicitly by driver test suites
192         # that need it.
193         self.driver_mock().list_sizes.return_value = [size]
194         driver = self.new_driver(create_kwargs=create_kwargs)
195         arv_node = arvados_node_mock()
196         cloud_node.name = driver.create_cloud_name(arv_node)
197         create_method = self.driver_mock().create_node
198         create_method.side_effect = cloud_types.LibcloudError("fake timeout")
199         self.driver_mock().list_nodes.return_value = [cloud_node]
200         actual = driver.create_node(size, arv_node)
201         self.assertIs(size, actual.size)
202
203
204 class RemotePollLoopActorTestMixin(ActorTestMixin):
205     def build_monitor(self, *args, **kwargs):
206         self.timer = mock.MagicMock(name='timer_mock')
207         self.client = mock.MagicMock(name='client_mock')
208         self.subscriber = mock.Mock(name='subscriber_mock')
209         self.monitor = self.TEST_CLASS.start(
210             self.client, self.timer, *args, **kwargs).proxy()
211
212 def cloud_node_mock(node_num=99, size=MockSize(1), **extra):
213     node = mock.NonCallableMagicMock(
214         ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
215          'image', 'extra'],
216         name='cloud_node')
217     node.id = str(node_num)
218     node.name = node.id
219     node.size = size
220     node.public_ips = []
221     node.private_ips = [ip_address_mock(node_num)]
222     node.extra = extra
223     return node