8784: Fix test for latest firefox.
[arvados.git] / sdk / python / tests / arvados_testutil.py
1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import str
4 from builtins import range
5 from builtins import object
6 import arvados
7 import contextlib
8 import errno
9 import hashlib
10 import http.client
11 import httplib2
12 import io
13 import mock
14 import os
15 import pycurl
16 import queue
17 import shutil
18 import sys
19 import tempfile
20 import unittest
21
22 if sys.version_info >= (3, 0):
23     from io import StringIO, BytesIO
24 else:
25     from cStringIO import StringIO
26     BytesIO = StringIO
27
28 # Use this hostname when you want to make sure the traffic will be
29 # instantly refused.  100::/64 is a dedicated black hole.
30 TEST_HOST = '100::'
31
32 skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
33
34 def queue_with(items):
35     """Return a thread-safe iterator that yields the given items.
36
37     +items+ can be given as an array or an iterator. If an iterator is
38     given, it will be consumed to fill the queue before queue_with()
39     returns.
40     """
41     q = queue.Queue()
42     for val in items:
43         q.put(val)
44     return lambda *args, **kwargs: q.get(block=False)
45
46 # fake_httplib2_response and mock_responses
47 # mock calls to httplib2.Http.request()
48 def fake_httplib2_response(code, **headers):
49     headers.update(status=str(code),
50                    reason=http.client.responses.get(code, "Unknown Response"))
51     return httplib2.Response(headers)
52
53 def mock_responses(body, *codes, **headers):
54     if not isinstance(body, bytes) and hasattr(body, 'encode'):
55         body = body.encode()
56     return mock.patch('httplib2.Http.request', side_effect=queue_with((
57         (fake_httplib2_response(code, **headers), body) for code in codes)))
58
59 def mock_api_responses(api_client, body, codes, headers={}):
60     if not isinstance(body, bytes) and hasattr(body, 'encode'):
61         body = body.encode()
62     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
63         (fake_httplib2_response(code, **headers), body) for code in codes)))
64
65 def str_keep_locator(s):
66     return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s))
67
68 @contextlib.contextmanager
69 def redirected_streams(stdout=None, stderr=None):
70     if stdout == StringIO:
71         stdout = StringIO()
72     if stderr == StringIO:
73         stderr = StringIO()
74     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
75     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
76     try:
77         yield (stdout, stderr)
78     finally:
79         sys.stdout = orig_stdout
80         sys.stderr = orig_stderr
81
82
83 class VersionChecker(object):
84     def assertVersionOutput(self, out, err):
85         if sys.version_info >= (3, 0):
86             self.assertEqual(err.getvalue(), '')
87             v = out.getvalue()
88         else:
89             # Python 2 writes version info on stderr.
90             self.assertEqual(out.getvalue(), '')
91             v = err.getvalue()
92         self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+$\n")
93
94
95 class FakeCurl(object):
96     @classmethod
97     def make(cls, code, body=b'', headers={}):
98         if not isinstance(body, bytes) and hasattr(body, 'encode'):
99             body = body.encode()
100         return mock.Mock(spec=cls, wraps=cls(code, body, headers))
101
102     def __init__(self, code=200, body=b'', headers={}):
103         self._opt = {}
104         self._got_url = None
105         self._writer = None
106         self._headerfunction = None
107         self._resp_code = code
108         self._resp_body = body
109         self._resp_headers = headers
110
111     def getopt(self, opt):
112         return self._opt.get(str(opt), None)
113
114     def setopt(self, opt, val):
115         self._opt[str(opt)] = val
116         if opt == pycurl.WRITEFUNCTION:
117             self._writer = val
118         elif opt == pycurl.HEADERFUNCTION:
119             self._headerfunction = val
120
121     def perform(self):
122         if not isinstance(self._resp_code, int):
123             raise self._resp_code
124         if self.getopt(pycurl.URL) is None:
125             raise ValueError
126         if self._writer is None:
127             raise ValueError
128         if self._headerfunction:
129             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
130             for k, v in self._resp_headers.items():
131                 self._headerfunction(k + ': ' + str(v))
132         if type(self._resp_body) is not bool:
133             self._writer(self._resp_body)
134
135     def close(self):
136         pass
137
138     def reset(self):
139         """Prevent fake UAs from going back into the user agent pool."""
140         raise Exception
141
142     def getinfo(self, opt):
143         if opt == pycurl.RESPONSE_CODE:
144             return self._resp_code
145         raise Exception
146
147 def mock_keep_responses(body, *codes, **headers):
148     """Patch pycurl to return fake responses and raise exceptions.
149
150     body can be a string to return as the response body; an exception
151     to raise when perform() is called; or an iterable that returns a
152     sequence of such values.
153     """
154     cm = mock.MagicMock()
155     if isinstance(body, tuple):
156         codes = list(codes)
157         codes.insert(0, body)
158         responses = [
159             FakeCurl.make(code=code, body=b, headers=headers)
160             for b, code in codes
161         ]
162     else:
163         responses = [
164             FakeCurl.make(code=code, body=body, headers=headers)
165             for code in codes
166         ]
167     cm.side_effect = queue_with(responses)
168     cm.responses = responses
169     return mock.patch('pycurl.Curl', cm)
170
171
172 class MockStreamReader(object):
173     def __init__(self, name='.', *data):
174         self._name = name
175         self._data = b''.join([
176             b if isinstance(b, bytes) else b.encode()
177             for b in data])
178         self._data_locators = [str_keep_locator(d) for d in data]
179         self.num_retries = 0
180
181     def name(self):
182         return self._name
183
184     def readfrom(self, start, size, num_retries=None):
185         return self._data[start:start + size]
186
187 class ApiClientMock(object):
188     def api_client_mock(self):
189         return mock.MagicMock(name='api_client_mock')
190
191     def mock_keep_services(self, api_mock=None, status=200, count=12,
192                            service_type='disk',
193                            service_host=None,
194                            service_port=None,
195                            service_ssl_flag=False,
196                            additional_services=[],
197                            read_only=False):
198         if api_mock is None:
199             api_mock = self.api_client_mock()
200         body = {
201             'items_available': count,
202             'items': [{
203                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
204                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
205                 'service_host': service_host or 'keep0x{:x}'.format(i),
206                 'service_port': service_port or 65535-i,
207                 'service_ssl_flag': service_ssl_flag,
208                 'service_type': service_type,
209                 'read_only': read_only,
210             } for i in range(0, count)] + additional_services
211         }
212         self._mock_api_call(api_mock.keep_services().accessible, status, body)
213         return api_mock
214
215     def _mock_api_call(self, mock_method, code, body):
216         mock_method = mock_method().execute
217         if code == 200:
218             mock_method.return_value = body
219         else:
220             mock_method.side_effect = arvados.errors.ApiError(
221                 fake_httplib2_response(code), b"{}")
222
223
224 class ArvadosBaseTestCase(unittest.TestCase):
225     # This class provides common utility functions for our tests.
226
227     def setUp(self):
228         self._tempdirs = []
229
230     def tearDown(self):
231         for workdir in self._tempdirs:
232             shutil.rmtree(workdir, ignore_errors=True)
233
234     def make_tmpdir(self):
235         self._tempdirs.append(tempfile.mkdtemp())
236         return self._tempdirs[-1]
237
238     def data_file(self, filename):
239         try:
240             basedir = os.path.dirname(__file__)
241         except NameError:
242             basedir = '.'
243         return open(os.path.join(basedir, 'data', filename))
244
245     def build_directory_tree(self, tree):
246         tree_root = self.make_tmpdir()
247         for leaf in tree:
248             path = os.path.join(tree_root, leaf)
249             try:
250                 os.makedirs(os.path.dirname(path))
251             except OSError as error:
252                 if error.errno != errno.EEXIST:
253                     raise
254             with open(path, 'w') as tmpfile:
255                 tmpfile.write(leaf)
256         return tree_root
257
258     def make_test_file(self, text=b"test"):
259         testfile = tempfile.NamedTemporaryFile()
260         testfile.write(text)
261         testfile.flush()
262         return testfile
263
264 if sys.version_info < (3, 0):
265     # There is no assert[Not]Regex that works in both Python 2 and 3,
266     # so we backport Python 3 style to Python 2.
267     def assertRegex(self, *args, **kwargs):
268         return self.assertRegexpMatches(*args, **kwargs)
269     def assertNotRegex(self, *args, **kwargs):
270         return self.assertNotRegexpMatches(*args, **kwargs)
271     unittest.TestCase.assertRegex = assertRegex
272     unittest.TestCase.assertNotRegex = assertNotRegex