X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f119f7707d93e90842fce3890deffc59c5e7081..ba34a22d9918ae97306472c04701e69090821c82:/sdk/python/tests/arvados_testutil.py diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py index a10802ae81..0035659796 100644 --- a/sdk/python/tests/arvados_testutil.py +++ b/sdk/python/tests/arvados_testutil.py @@ -1,19 +1,34 @@ -#!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 +from future import standard_library +standard_library.install_aliases() +from builtins import str +from builtins import range +from builtins import object import arvados +import contextlib import errno import hashlib -import httplib +import http.client import httplib2 import io import mock import os -import Queue -import requests +import pycurl +import queue import shutil +import sys import tempfile import unittest +if sys.version_info >= (3, 0): + from io import StringIO, BytesIO +else: + from cStringIO import StringIO + BytesIO = StringIO + # Use this hostname when you want to make sure the traffic will be # instantly refused. 100::/64 is a dedicated black hole. TEST_HOST = '100::' @@ -27,67 +42,144 @@ def queue_with(items): given, it will be consumed to fill the queue before queue_with() returns. """ - queue = Queue.Queue() + q = queue.Queue() for val in items: - queue.put(val) - return lambda *args, **kwargs: queue.get(block=False) + q.put(val) + return lambda *args, **kwargs: q.get(block=False) # fake_httplib2_response and mock_responses # mock calls to httplib2.Http.request() def fake_httplib2_response(code, **headers): headers.update(status=str(code), - reason=httplib.responses.get(code, "Unknown Response")) + reason=http.client.responses.get(code, "Unknown Response")) return httplib2.Response(headers) def mock_responses(body, *codes, **headers): + if not isinstance(body, bytes) and hasattr(body, 'encode'): + body = body.encode() return mock.patch('httplib2.Http.request', side_effect=queue_with(( (fake_httplib2_response(code, **headers), body) for code in codes))) -# fake_requests_response, mock_get_responses and mock_put_responses -# mock calls to requests.get() and requests.put() -def fake_requests_response(code, body, **headers): - r = requests.Response() - r.status_code = code - r.reason = httplib.responses.get(code, "Unknown Response") - r.headers = headers - r.raw = io.BytesIO(body) - return r - -# The following methods patch requests.Session(), where return_value is a mock -# Session object. The put/get attributes are set on mock Session, and the -# desired put/get behavior is set on the put/get mocks. - -def mock_put_responses(body, *codes, **headers): - m = mock.MagicMock() +def mock_api_responses(api_client, body, codes, headers={}): + if not isinstance(body, bytes) and hasattr(body, 'encode'): + body = body.encode() + return mock.patch.object(api_client._http, 'request', side_effect=queue_with(( + (fake_httplib2_response(code, **headers), body) for code in codes))) + +def str_keep_locator(s): + return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s)) + +@contextlib.contextmanager +def redirected_streams(stdout=None, stderr=None): + if stdout == StringIO: + stdout = StringIO() + if stderr == StringIO: + stderr = StringIO() + orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout + orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr + try: + yield (stdout, stderr) + finally: + sys.stdout = orig_stdout + sys.stderr = orig_stderr + + +class VersionChecker(object): + def assertVersionOutput(self, out, err): + if sys.version_info >= (3, 0): + self.assertEqual(err.getvalue(), '') + v = out.getvalue() + else: + # Python 2 writes version info on stderr. + self.assertEqual(out.getvalue(), '') + v = err.getvalue() + self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+(\.dev[0-9]+)?$\n") + + +class FakeCurl(object): + @classmethod + def make(cls, code, body=b'', headers={}): + if not isinstance(body, bytes) and hasattr(body, 'encode'): + body = body.encode() + return mock.Mock(spec=cls, wraps=cls(code, body, headers)) + + def __init__(self, code=200, body=b'', headers={}): + self._opt = {} + self._got_url = None + self._writer = None + self._headerfunction = None + self._resp_code = code + self._resp_body = body + self._resp_headers = headers + + def getopt(self, opt): + return self._opt.get(str(opt), None) + + def setopt(self, opt, val): + self._opt[str(opt)] = val + if opt == pycurl.WRITEFUNCTION: + self._writer = val + elif opt == pycurl.HEADERFUNCTION: + self._headerfunction = val + + def perform(self): + if not isinstance(self._resp_code, int): + raise self._resp_code + if self.getopt(pycurl.URL) is None: + raise ValueError + if self._writer is None: + raise ValueError + if self._headerfunction: + self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code)) + for k, v in self._resp_headers.items(): + self._headerfunction(k + ': ' + str(v)) + if type(self._resp_body) is not bool: + self._writer(self._resp_body) + + def close(self): + pass + + def reset(self): + """Prevent fake UAs from going back into the user agent pool.""" + raise Exception + + def getinfo(self, opt): + if opt == pycurl.RESPONSE_CODE: + return self._resp_code + raise Exception + +def mock_keep_responses(body, *codes, **headers): + """Patch pycurl to return fake responses and raise exceptions. + + body can be a string to return as the response body; an exception + to raise when perform() is called; or an iterable that returns a + sequence of such values. + """ + cm = mock.MagicMock() if isinstance(body, tuple): codes = list(codes) codes.insert(0, body) - m.return_value.put.side_effect = queue_with((fake_requests_response(code, b, **headers) for b, code in codes)) + responses = [ + FakeCurl.make(code=code, body=b, headers=headers) + for b, code in codes + ] else: - m.return_value.put.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes)) - return mock.patch('requests.Session', m) + responses = [ + FakeCurl.make(code=code, body=body, headers=headers) + for code in codes + ] + cm.side_effect = queue_with(responses) + cm.responses = responses + return mock.patch('pycurl.Curl', cm) -def mock_get_responses(body, *codes, **headers): - m = mock.MagicMock() - m.return_value.get.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes)) - return mock.patch('requests.Session', m) - -def mock_get(side_effect): - m = mock.MagicMock() - m.return_value.get.side_effect = side_effect - return mock.patch('requests.Session', m) - -def mock_put(side_effect): - m = mock.MagicMock() - m.return_value.put.side_effect = side_effect - return mock.patch('requests.Session', m) class MockStreamReader(object): def __init__(self, name='.', *data): self._name = name - self._data = ''.join(data) - self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(), - len(d)) for d in data] + self._data = b''.join([ + b if isinstance(b, bytes) else b.encode() + for b in data]) + self._data_locators = [str_keep_locator(d) for d in data] self.num_retries = 0 def name(self): @@ -98,14 +190,21 @@ class MockStreamReader(object): class ApiClientMock(object): def api_client_mock(self): - return mock.MagicMock(name='api_client_mock') + api_mock = mock.MagicMock(name='api_client_mock') + api_mock.config.return_value = { + 'StorageClasses': { + 'default': {'Default': True} + } + } + return api_mock def mock_keep_services(self, api_mock=None, status=200, count=12, service_type='disk', service_host=None, service_port=None, service_ssl_flag=False, - additional_services=[]): + additional_services=[], + read_only=False): if api_mock is None: api_mock = self.api_client_mock() body = { @@ -117,6 +216,7 @@ class ApiClientMock(object): 'service_port': service_port or 65535-i, 'service_ssl_flag': service_ssl_flag, 'service_type': service_type, + 'read_only': read_only, } for i in range(0, count)] + additional_services } self._mock_api_call(api_mock.keep_services().accessible, status, body) @@ -128,7 +228,7 @@ class ApiClientMock(object): mock_method.return_value = body else: mock_method.side_effect = arvados.errors.ApiError( - fake_httplib2_response(code), "{}") + fake_httplib2_response(code), b"{}") class ArvadosBaseTestCase(unittest.TestCase): @@ -165,8 +265,45 @@ class ArvadosBaseTestCase(unittest.TestCase): tmpfile.write(leaf) return tree_root - def make_test_file(self, text="test"): + def make_test_file(self, text=b"test"): testfile = tempfile.NamedTemporaryFile() testfile.write(text) testfile.flush() return testfile + +if sys.version_info < (3, 0): + # There is no assert[Not]Regex that works in both Python 2 and 3, + # so we backport Python 3 style to Python 2. + def assertRegex(self, *args, **kwargs): + return self.assertRegexpMatches(*args, **kwargs) + def assertNotRegex(self, *args, **kwargs): + return self.assertNotRegexpMatches(*args, **kwargs) + unittest.TestCase.assertRegex = assertRegex + unittest.TestCase.assertNotRegex = assertNotRegex + +def binary_compare(a, b): + if len(a) != len(b): + return False + for i in range(0, len(a)): + if a[i] != b[i]: + return False + return True + +def make_block_cache(disk_cache): + if disk_cache: + disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") + shutil.rmtree(disk_cache_dir, ignore_errors=True) + block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache) + return block_cache + + +class DiskCacheBase: + def make_block_cache(self, disk_cache): + self.disk_cache_dir = tempfile.mkdtemp() if disk_cache else None + block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache, + disk_cache_dir=self.disk_cache_dir) + return block_cache + + def tearDown(self): + if self.disk_cache_dir: + shutil.rmtree(self.disk_cache_dir)