1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import str
4 from builtins import range
5 from builtins import object
22 if sys.version_info >= (3, 0):
23 from io import StringIO, BytesIO
25 from cStringIO import StringIO
28 # Use this hostname when you want to make sure the traffic will be
29 # instantly refused. 100::/64 is a dedicated black hole.
32 skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me
34 def queue_with(items):
35 """Return a thread-safe iterator that yields the given items.
37 +items+ can be given as an array or an iterator. If an iterator is
38 given, it will be consumed to fill the queue before queue_with()
44 return lambda *args, **kwargs: q.get(block=False)
46 # fake_httplib2_response and mock_responses
47 # mock calls to httplib2.Http.request()
48 def fake_httplib2_response(code, **headers):
49 headers.update(status=str(code),
50 reason=http.client.responses.get(code, "Unknown Response"))
51 return httplib2.Response(headers)
53 def mock_responses(body, *codes, **headers):
54 if not isinstance(body, bytes) and hasattr(body, 'encode'):
56 return mock.patch('httplib2.Http.request', side_effect=queue_with((
57 (fake_httplib2_response(code, **headers), body) for code in codes)))
59 def mock_api_responses(api_client, body, codes, headers={}):
60 if not isinstance(body, bytes) and hasattr(body, 'encode'):
62 return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
63 (fake_httplib2_response(code, **headers), body) for code in codes)))
65 def str_keep_locator(s):
66 return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s))
68 @contextlib.contextmanager
69 def redirected_streams(stdout=None, stderr=None):
70 if stdout == StringIO:
72 if stderr == StringIO:
74 orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
75 orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
77 yield (stdout, stderr)
79 sys.stdout = orig_stdout
80 sys.stderr = orig_stderr
83 class VersionChecker(object):
84 def assertVersionOutput(self, out, err):
85 if sys.version_info >= (3, 0):
86 self.assertEqual(err.getvalue(), '')
89 # Python 2 writes version info on stderr.
90 self.assertEqual(out.getvalue(), '')
92 self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+$\n")
95 class FakeCurl(object):
97 def make(cls, code, body=b'', headers={}):
98 if not isinstance(body, bytes) and hasattr(body, 'encode'):
100 return mock.Mock(spec=cls, wraps=cls(code, body, headers))
102 def __init__(self, code=200, body=b'', headers={}):
106 self._headerfunction = None
107 self._resp_code = code
108 self._resp_body = body
109 self._resp_headers = headers
111 def getopt(self, opt):
112 return self._opt.get(str(opt), None)
114 def setopt(self, opt, val):
115 self._opt[str(opt)] = val
116 if opt == pycurl.WRITEFUNCTION:
118 elif opt == pycurl.HEADERFUNCTION:
119 self._headerfunction = val
122 if not isinstance(self._resp_code, int):
123 raise self._resp_code
124 if self.getopt(pycurl.URL) is None:
126 if self._writer is None:
128 if self._headerfunction:
129 self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
130 for k, v in self._resp_headers.items():
131 self._headerfunction(k + ': ' + str(v))
132 if type(self._resp_body) is not bool:
133 self._writer(self._resp_body)
139 """Prevent fake UAs from going back into the user agent pool."""
142 def getinfo(self, opt):
143 if opt == pycurl.RESPONSE_CODE:
144 return self._resp_code
147 def mock_keep_responses(body, *codes, **headers):
148 """Patch pycurl to return fake responses and raise exceptions.
150 body can be a string to return as the response body; an exception
151 to raise when perform() is called; or an iterable that returns a
152 sequence of such values.
154 cm = mock.MagicMock()
155 if isinstance(body, tuple):
157 codes.insert(0, body)
159 FakeCurl.make(code=code, body=b, headers=headers)
164 FakeCurl.make(code=code, body=body, headers=headers)
167 cm.side_effect = queue_with(responses)
168 cm.responses = responses
169 return mock.patch('pycurl.Curl', cm)
172 class MockStreamReader(object):
173 def __init__(self, name='.', *data):
175 self._data = b''.join([
176 b if isinstance(b, bytes) else b.encode()
178 self._data_locators = [str_keep_locator(d) for d in data]
184 def readfrom(self, start, size, num_retries=None):
185 return self._data[start:start + size]
187 class ApiClientMock(object):
188 def api_client_mock(self):
189 return mock.MagicMock(name='api_client_mock')
191 def mock_keep_services(self, api_mock=None, status=200, count=12,
195 service_ssl_flag=False,
196 additional_services=[],
199 api_mock = self.api_client_mock()
201 'items_available': count,
203 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
204 'owner_uuid': 'zzzzz-tpzed-000000000000000',
205 'service_host': service_host or 'keep0x{:x}'.format(i),
206 'service_port': service_port or 65535-i,
207 'service_ssl_flag': service_ssl_flag,
208 'service_type': service_type,
209 'read_only': read_only,
210 } for i in range(0, count)] + additional_services
212 self._mock_api_call(api_mock.keep_services().accessible, status, body)
215 def _mock_api_call(self, mock_method, code, body):
216 mock_method = mock_method().execute
218 mock_method.return_value = body
220 mock_method.side_effect = arvados.errors.ApiError(
221 fake_httplib2_response(code), b"{}")
224 class ArvadosBaseTestCase(unittest.TestCase):
225 # This class provides common utility functions for our tests.
231 for workdir in self._tempdirs:
232 shutil.rmtree(workdir, ignore_errors=True)
234 def make_tmpdir(self):
235 self._tempdirs.append(tempfile.mkdtemp())
236 return self._tempdirs[-1]
238 def data_file(self, filename):
240 basedir = os.path.dirname(__file__)
243 return open(os.path.join(basedir, 'data', filename))
245 def build_directory_tree(self, tree):
246 tree_root = self.make_tmpdir()
248 path = os.path.join(tree_root, leaf)
250 os.makedirs(os.path.dirname(path))
251 except OSError as error:
252 if error.errno != errno.EEXIST:
254 with open(path, 'w') as tmpfile:
258 def make_test_file(self, text=b"test"):
259 testfile = tempfile.NamedTemporaryFile()
264 if sys.version_info < (3, 0):
265 # There is no assert[Not]Regex that works in both Python 2 and 3,
266 # so we backport Python 3 style to Python 2.
267 def assertRegex(self, *args, **kwargs):
268 return self.assertRegexpMatches(*args, **kwargs)
269 def assertNotRegex(self, *args, **kwargs):
270 return self.assertNotRegexpMatches(*args, **kwargs)
271 unittest.TestCase.assertRegex = assertRegex
272 unittest.TestCase.assertNotRegex = assertNotRegex