21207: Remove Python slow/short tests support
[arvados.git] / sdk / python / tests / arvados_testutil.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import contextlib
7 import errno
8 import hashlib
9 import http.client
10 import httplib2
11 import io
12 import os
13 import pycurl
14 import queue
15 import shutil
16 import sys
17 import tempfile
18 import unittest
19
20 from io import StringIO, BytesIO
21 from unittest import mock
22
23 # Use this hostname when you want to make sure the traffic will be
24 # instantly refused.  100::/64 is a dedicated black hole.
25 TEST_HOST = '100::'
26
27 skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
28
29 def queue_with(items):
30     """Return a thread-safe iterator that yields the given items.
31
32     +items+ can be given as an array or an iterator. If an iterator is
33     given, it will be consumed to fill the queue before queue_with()
34     returns.
35     """
36     q = queue.Queue()
37     for val in items:
38         q.put(val)
39     return lambda *args, **kwargs: q.get(block=False)
40
41 # fake_httplib2_response and mock_responses
42 # mock calls to httplib2.Http.request()
43 def fake_httplib2_response(code, **headers):
44     headers.update(status=str(code),
45                    reason=http.client.responses.get(code, "Unknown Response"))
46     return httplib2.Response(headers)
47
48 def mock_responses(body, *codes, **headers):
49     if not isinstance(body, bytes) and hasattr(body, 'encode'):
50         body = body.encode()
51     return mock.patch('httplib2.Http.request', side_effect=queue_with((
52         (fake_httplib2_response(code, **headers), body) for code in codes)))
53
54 def mock_api_responses(api_client, body, codes, headers={}, method='request'):
55     if not isinstance(body, bytes) and hasattr(body, 'encode'):
56         body = body.encode()
57     return mock.patch.object(api_client._http, method, side_effect=queue_with((
58         (fake_httplib2_response(code, **headers), body) for code in codes)))
59
60 def str_keep_locator(s):
61     return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s))
62
63 @contextlib.contextmanager
64 def redirected_streams(stdout=None, stderr=None):
65     if stdout == StringIO:
66         stdout = StringIO()
67     if stderr == StringIO:
68         stderr = StringIO()
69     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
70     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
71     try:
72         yield (stdout, stderr)
73     finally:
74         sys.stdout = orig_stdout
75         sys.stderr = orig_stderr
76
77
78 class VersionChecker(object):
79     def assertVersionOutput(self, out, err):
80         if sys.version_info >= (3, 0):
81             self.assertEqual(err.getvalue(), '')
82             v = out.getvalue()
83         else:
84             # Python 2 writes version info on stderr.
85             self.assertEqual(out.getvalue(), '')
86             v = err.getvalue()
87         self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+(\.dev[0-9]+)?$\n")
88
89
90 class FakeCurl(object):
91     @classmethod
92     def make(cls, code, body=b'', headers={}):
93         if not isinstance(body, bytes) and hasattr(body, 'encode'):
94             body = body.encode()
95         return mock.Mock(spec=cls, wraps=cls(code, body, headers))
96
97     def __init__(self, code=200, body=b'', headers={}):
98         self._opt = {}
99         self._got_url = None
100         self._writer = None
101         self._headerfunction = None
102         self._resp_code = code
103         self._resp_body = body
104         self._resp_headers = headers
105
106     def getopt(self, opt):
107         return self._opt.get(str(opt), None)
108
109     def setopt(self, opt, val):
110         self._opt[str(opt)] = val
111         if opt == pycurl.WRITEFUNCTION:
112             self._writer = val
113         elif opt == pycurl.HEADERFUNCTION:
114             self._headerfunction = val
115
116     def perform(self):
117         if not isinstance(self._resp_code, int):
118             raise self._resp_code
119         if self.getopt(pycurl.URL) is None:
120             raise ValueError
121         if self._writer is None:
122             raise ValueError
123         if self._headerfunction:
124             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
125             for k, v in self._resp_headers.items():
126                 self._headerfunction(k + ': ' + str(v))
127         if type(self._resp_body) is not bool:
128             self._writer(self._resp_body)
129
130     def close(self):
131         pass
132
133     def reset(self):
134         """Prevent fake UAs from going back into the user agent pool."""
135         raise Exception
136
137     def getinfo(self, opt):
138         if opt == pycurl.RESPONSE_CODE:
139             return self._resp_code
140         raise Exception
141
142 def mock_keep_responses(body, *codes, **headers):
143     """Patch pycurl to return fake responses and raise exceptions.
144
145     body can be a string to return as the response body; an exception
146     to raise when perform() is called; or an iterable that returns a
147     sequence of such values.
148     """
149     cm = mock.MagicMock()
150     if isinstance(body, tuple):
151         codes = list(codes)
152         codes.insert(0, body)
153         responses = [
154             FakeCurl.make(code=code, body=b, headers=headers)
155             for b, code in codes
156         ]
157     else:
158         responses = [
159             FakeCurl.make(code=code, body=body, headers=headers)
160             for code in codes
161         ]
162     cm.side_effect = queue_with(responses)
163     cm.responses = responses
164     return mock.patch('pycurl.Curl', cm)
165
166
167 class MockStreamReader(object):
168     def __init__(self, name='.', *data):
169         self._name = name
170         self._data = b''.join([
171             b if isinstance(b, bytes) else b.encode()
172             for b in data])
173         self._data_locators = [str_keep_locator(d) for d in data]
174         self.num_retries = 0
175
176     def name(self):
177         return self._name
178
179     def readfrom(self, start, size, num_retries=None):
180         return self._data[start:start + size]
181
182 class ApiClientMock(object):
183     def api_client_mock(self):
184         api_mock = mock.MagicMock(name='api_client_mock')
185         api_mock.config.return_value = {
186             'StorageClasses': {
187                 'default': {'Default': True}
188             }
189         }
190         return api_mock
191
192     def mock_keep_services(self, api_mock=None, status=200, count=12,
193                            service_type='disk',
194                            service_host=None,
195                            service_port=None,
196                            service_ssl_flag=False,
197                            additional_services=[],
198                            read_only=False):
199         if api_mock is None:
200             api_mock = self.api_client_mock()
201         body = {
202             'items_available': count,
203             'items': [{
204                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
205                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
206                 'service_host': service_host or 'keep0x{:x}'.format(i),
207                 'service_port': service_port or 65535-i,
208                 'service_ssl_flag': service_ssl_flag,
209                 'service_type': service_type,
210                 'read_only': read_only,
211             } for i in range(0, count)] + additional_services
212         }
213         self._mock_api_call(api_mock.keep_services().accessible, status, body)
214         return api_mock
215
216     def _mock_api_call(self, mock_method, code, body):
217         mock_method = mock_method().execute
218         if code == 200:
219             mock_method.return_value = body
220         else:
221             mock_method.side_effect = arvados.errors.ApiError(
222                 fake_httplib2_response(code), b"{}")
223
224
225 class ArvadosBaseTestCase(unittest.TestCase):
226     # This class provides common utility functions for our tests.
227
228     def setUp(self):
229         self._tempdirs = []
230
231     def tearDown(self):
232         for workdir in self._tempdirs:
233             shutil.rmtree(workdir, ignore_errors=True)
234
235     def make_tmpdir(self):
236         self._tempdirs.append(tempfile.mkdtemp())
237         return self._tempdirs[-1]
238
239     def data_file(self, filename):
240         try:
241             basedir = os.path.dirname(__file__)
242         except NameError:
243             basedir = '.'
244         return open(os.path.join(basedir, 'data', filename))
245
246     def build_directory_tree(self, tree):
247         tree_root = self.make_tmpdir()
248         for leaf in tree:
249             path = os.path.join(tree_root, leaf)
250             try:
251                 os.makedirs(os.path.dirname(path))
252             except OSError as error:
253                 if error.errno != errno.EEXIST:
254                     raise
255             with open(path, 'w') as tmpfile:
256                 tmpfile.write(leaf)
257         return tree_root
258
259     def make_test_file(self, text=b"test"):
260         testfile = tempfile.NamedTemporaryFile()
261         testfile.write(text)
262         testfile.flush()
263         return testfile
264
265 if sys.version_info < (3, 0):
266     # There is no assert[Not]Regex that works in both Python 2 and 3,
267     # so we backport Python 3 style to Python 2.
268     def assertRegex(self, *args, **kwargs):
269         return self.assertRegexpMatches(*args, **kwargs)
270     def assertNotRegex(self, *args, **kwargs):
271         return self.assertNotRegexpMatches(*args, **kwargs)
272     unittest.TestCase.assertRegex = assertRegex
273     unittest.TestCase.assertNotRegex = assertNotRegex
274
275 def binary_compare(a, b):
276     if len(a) != len(b):
277         return False
278     for i in range(0, len(a)):
279         if a[i] != b[i]:
280             return False
281     return True
282
283 def make_block_cache(disk_cache):
284     if disk_cache:
285         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
286         shutil.rmtree(disk_cache_dir, ignore_errors=True)
287     block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
288     return block_cache
289
290
291 class DiskCacheBase:
292     def make_block_cache(self, disk_cache):
293         self.disk_cache_dir = tempfile.mkdtemp() if disk_cache else None
294         block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache,
295                                                   disk_cache_dir=self.disk_cache_dir)
296         return block_cache
297
298     def tearDown(self):
299         if self.disk_cache_dir:
300             shutil.rmtree(self.disk_cache_dir)