15397: Remove code, configs, and docs for hosted git repo feature.
[arvados.git] / sdk / python / tests / arvados_testutil.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 import arvados
8 import contextlib
9 import errno
10 import hashlib
11 import http.client
12 import httplib2
13 import io
14 import os
15 import pycurl
16 import queue
17 import shutil
18 import sys
19 import tempfile
20 import unittest
21
22 from io import StringIO, BytesIO
23 from unittest import mock
24
25 # Use this hostname when you want to make sure the traffic will be
26 # instantly refused.  100::/64 is a dedicated black hole.
27 TEST_HOST = '100::'
28
29 skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
30
31 def queue_with(items):
32     """Return a thread-safe iterator that yields the given items.
33
34     +items+ can be given as an array or an iterator. If an iterator is
35     given, it will be consumed to fill the queue before queue_with()
36     returns.
37     """
38     q = queue.Queue()
39     for val in items:
40         q.put(val)
41     return lambda *args, **kwargs: q.get(block=False)
42
43 # fake_httplib2_response and mock_responses
44 # mock calls to httplib2.Http.request()
45 def fake_httplib2_response(code, **headers):
46     headers.update(status=str(code),
47                    reason=http.client.responses.get(code, "Unknown Response"))
48     return httplib2.Response(headers)
49
50 def mock_responses(body, *codes, **headers):
51     if not isinstance(body, bytes) and hasattr(body, 'encode'):
52         body = body.encode()
53     return mock.patch('httplib2.Http.request', side_effect=queue_with((
54         (fake_httplib2_response(code, **headers), body) for code in codes)))
55
56 def mock_api_responses(api_client, body, codes, headers={}, method='request'):
57     if not isinstance(body, bytes) and hasattr(body, 'encode'):
58         body = body.encode()
59     return mock.patch.object(api_client._http, method, side_effect=queue_with((
60         (fake_httplib2_response(code, **headers), body) for code in codes)))
61
62 def str_keep_locator(s):
63     return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s))
64
65 @contextlib.contextmanager
66 def redirected_streams(stdout=None, stderr=None):
67     if stdout == StringIO:
68         stdout = StringIO()
69     if stderr == StringIO:
70         stderr = StringIO()
71     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
72     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
73     try:
74         yield (stdout, stderr)
75     finally:
76         sys.stdout = orig_stdout
77         sys.stderr = orig_stderr
78
79
80 class VersionChecker(object):
81     def assertVersionOutput(self, out, err):
82         if sys.version_info >= (3, 0):
83             self.assertEqual(err.getvalue(), '')
84             v = out.getvalue()
85         else:
86             # Python 2 writes version info on stderr.
87             self.assertEqual(out.getvalue(), '')
88             v = err.getvalue()
89         self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+(\.dev[0-9]+)?$\n")
90
91
92 class FakeCurl(object):
93     @classmethod
94     def make(cls, code, body=b'', headers={}):
95         if not isinstance(body, bytes) and hasattr(body, 'encode'):
96             body = body.encode()
97         return mock.Mock(spec=cls, wraps=cls(code, body, headers))
98
99     def __init__(self, code=200, body=b'', headers={}):
100         self._opt = {}
101         self._got_url = None
102         self._writer = None
103         self._headerfunction = None
104         self._resp_code = code
105         self._resp_body = body
106         self._resp_headers = headers
107
108     def getopt(self, opt):
109         return self._opt.get(str(opt), None)
110
111     def setopt(self, opt, val):
112         self._opt[str(opt)] = val
113         if opt == pycurl.WRITEFUNCTION:
114             self._writer = val
115         elif opt == pycurl.HEADERFUNCTION:
116             self._headerfunction = val
117
118     def perform(self):
119         if not isinstance(self._resp_code, int):
120             raise self._resp_code
121         if self.getopt(pycurl.URL) is None:
122             raise ValueError
123         if self._writer is None:
124             raise ValueError
125         if self._headerfunction:
126             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
127             for k, v in self._resp_headers.items():
128                 self._headerfunction(k + ': ' + str(v))
129         if type(self._resp_body) is not bool:
130             self._writer(self._resp_body)
131
132     def close(self):
133         pass
134
135     def reset(self):
136         """Prevent fake UAs from going back into the user agent pool."""
137         raise Exception
138
139     def getinfo(self, opt):
140         if opt == pycurl.RESPONSE_CODE:
141             return self._resp_code
142         raise Exception
143
144 def mock_keep_responses(body, *codes, **headers):
145     """Patch pycurl to return fake responses and raise exceptions.
146
147     body can be a string to return as the response body; an exception
148     to raise when perform() is called; or an iterable that returns a
149     sequence of such values.
150     """
151     cm = mock.MagicMock()
152     if isinstance(body, tuple):
153         codes = list(codes)
154         codes.insert(0, body)
155         responses = [
156             FakeCurl.make(code=code, body=b, headers=headers)
157             for b, code in codes
158         ]
159     else:
160         responses = [
161             FakeCurl.make(code=code, body=body, headers=headers)
162             for code in codes
163         ]
164     cm.side_effect = queue_with(responses)
165     cm.responses = responses
166     return mock.patch('pycurl.Curl', cm)
167
168
169 class MockStreamReader(object):
170     def __init__(self, name='.', *data):
171         self._name = name
172         self._data = b''.join([
173             b if isinstance(b, bytes) else b.encode()
174             for b in data])
175         self._data_locators = [str_keep_locator(d) for d in data]
176         self.num_retries = 0
177
178     def name(self):
179         return self._name
180
181     def readfrom(self, start, size, num_retries=None):
182         return self._data[start:start + size]
183
184 class ApiClientMock(object):
185     def api_client_mock(self):
186         api_mock = mock.MagicMock(name='api_client_mock')
187         api_mock.config.return_value = {
188             'StorageClasses': {
189                 'default': {'Default': True}
190             }
191         }
192         return api_mock
193
194     def mock_keep_services(self, api_mock=None, status=200, count=12,
195                            service_type='disk',
196                            service_host=None,
197                            service_port=None,
198                            service_ssl_flag=False,
199                            additional_services=[],
200                            read_only=False):
201         if api_mock is None:
202             api_mock = self.api_client_mock()
203         body = {
204             'items_available': count,
205             'items': [{
206                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
207                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
208                 'service_host': service_host or 'keep0x{:x}'.format(i),
209                 'service_port': service_port or 65535-i,
210                 'service_ssl_flag': service_ssl_flag,
211                 'service_type': service_type,
212                 'read_only': read_only,
213             } for i in range(0, count)] + additional_services
214         }
215         self._mock_api_call(api_mock.keep_services().accessible, status, body)
216         return api_mock
217
218     def _mock_api_call(self, mock_method, code, body):
219         mock_method = mock_method().execute
220         if code == 200:
221             mock_method.return_value = body
222         else:
223             mock_method.side_effect = arvados.errors.ApiError(
224                 fake_httplib2_response(code), b"{}")
225
226
227 class ArvadosBaseTestCase(unittest.TestCase):
228     # This class provides common utility functions for our tests.
229
230     def setUp(self):
231         self._tempdirs = []
232
233     def tearDown(self):
234         for workdir in self._tempdirs:
235             shutil.rmtree(workdir, ignore_errors=True)
236
237     def make_tmpdir(self):
238         self._tempdirs.append(tempfile.mkdtemp())
239         return self._tempdirs[-1]
240
241     def data_file(self, filename):
242         try:
243             basedir = os.path.dirname(__file__)
244         except NameError:
245             basedir = '.'
246         return open(os.path.join(basedir, 'data', filename))
247
248     def build_directory_tree(self, tree):
249         tree_root = self.make_tmpdir()
250         for leaf in tree:
251             path = os.path.join(tree_root, leaf)
252             try:
253                 os.makedirs(os.path.dirname(path))
254             except OSError as error:
255                 if error.errno != errno.EEXIST:
256                     raise
257             with open(path, 'w') as tmpfile:
258                 tmpfile.write(leaf)
259         return tree_root
260
261     def make_test_file(self, text=b"test"):
262         testfile = tempfile.NamedTemporaryFile()
263         testfile.write(text)
264         testfile.flush()
265         return testfile
266
267 if sys.version_info < (3, 0):
268     # There is no assert[Not]Regex that works in both Python 2 and 3,
269     # so we backport Python 3 style to Python 2.
270     def assertRegex(self, *args, **kwargs):
271         return self.assertRegexpMatches(*args, **kwargs)
272     def assertNotRegex(self, *args, **kwargs):
273         return self.assertNotRegexpMatches(*args, **kwargs)
274     unittest.TestCase.assertRegex = assertRegex
275     unittest.TestCase.assertNotRegex = assertNotRegex
276
277 def binary_compare(a, b):
278     if len(a) != len(b):
279         return False
280     for i in range(0, len(a)):
281         if a[i] != b[i]:
282             return False
283     return True
284
285 def make_block_cache(disk_cache):
286     if disk_cache:
287         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
288         shutil.rmtree(disk_cache_dir, ignore_errors=True)
289     block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
290     return block_cache
291
292
293 class DiskCacheBase:
294     def make_block_cache(self, disk_cache):
295         self.disk_cache_dir = tempfile.mkdtemp() if disk_cache else None
296         block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache,
297                                                   disk_cache_dir=self.disk_cache_dir)
298         return block_cache
299
300     def tearDown(self):
301         if self.disk_cache_dir:
302             shutil.rmtree(self.disk_cache_dir)