X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f16be1736d705278ae39bde3fc6d6d9d1f302fa7..d14dd75b263d8f999603b66d23f74667d36a2412:/sdk/python/tests/arvados_testutil.py diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py index 369d561804..d138e5964a 100644 --- a/sdk/python/tests/arvados_testutil.py +++ b/sdk/python/tests/arvados_testutil.py @@ -1,11 +1,172 @@ #!/usr/bin/env python +import arvados import errno +import hashlib +import httplib +import httplib2 +import io +import mock import os +import pycurl +import Queue import shutil import tempfile import unittest +# Use this hostname when you want to make sure the traffic will be +# instantly refused. 100::/64 is a dedicated black hole. +TEST_HOST = '100::' + +skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me + +def queue_with(items): + """Return a thread-safe iterator that yields the given items. + + +items+ can be given as an array or an iterator. If an iterator is + given, it will be consumed to fill the queue before queue_with() + returns. + """ + queue = Queue.Queue() + for val in items: + queue.put(val) + return lambda *args, **kwargs: queue.get(block=False) + +# fake_httplib2_response and mock_responses +# mock calls to httplib2.Http.request() +def fake_httplib2_response(code, **headers): + headers.update(status=str(code), + reason=httplib.responses.get(code, "Unknown Response")) + return httplib2.Response(headers) + +def mock_responses(body, *codes, **headers): + return mock.patch('httplib2.Http.request', side_effect=queue_with(( + (fake_httplib2_response(code, **headers), body) for code in codes))) + + +class FakeCurl: + @classmethod + def make(cls, code, body='', headers={}): + return mock.Mock(spec=cls, wraps=cls(code, body, headers)) + + def __init__(self, code=200, body='', headers={}): + self._opt = {} + self._got_url = None + self._writer = None + self._headerfunction = None + self._resp_code = code + self._resp_body = body + self._resp_headers = headers + + def getopt(self, opt): + return self._opt.get(str(opt), None) + + def setopt(self, opt, val): + self._opt[str(opt)] = val + if opt == pycurl.WRITEFUNCTION: + self._writer = val + elif opt == pycurl.HEADERFUNCTION: + self._headerfunction = val + + def perform(self): + if not isinstance(self._resp_code, int): + raise self._resp_code + if self.getopt(pycurl.URL) is None: + raise ValueError + if self._writer is None: + raise ValueError + if self._headerfunction: + self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code)) + for k, v in self._resp_headers.iteritems(): + self._headerfunction(k + ': ' + str(v)) + self._writer(self._resp_body) + + def close(self): + pass + + def reset(self): + """Prevent fake UAs from going back into the user agent pool.""" + raise Exception + + def getinfo(self, opt): + if opt == pycurl.RESPONSE_CODE: + return self._resp_code + raise Exception + +def mock_keep_responses(body, *codes, **headers): + """Patch pycurl to return fake responses and raise exceptions. + + body can be a string to return as the response body; an exception + to raise when perform() is called; or an iterable that returns a + sequence of such values. + """ + cm = mock.MagicMock() + if isinstance(body, tuple): + codes = list(codes) + codes.insert(0, body) + responses = [ + FakeCurl.make(code=code, body=b, headers=headers) + for b, code in codes + ] + else: + responses = [ + FakeCurl.make(code=code, body=body, headers=headers) + for code in codes + ] + cm.side_effect = queue_with(responses) + cm.responses = responses + return mock.patch('pycurl.Curl', cm) + + +class MockStreamReader(object): + def __init__(self, name='.', *data): + self._name = name + self._data = ''.join(data) + self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(), + len(d)) for d in data] + self.num_retries = 0 + + def name(self): + return self._name + + def readfrom(self, start, size, num_retries=None): + return self._data[start:start + size] + +class ApiClientMock(object): + def api_client_mock(self): + return mock.MagicMock(name='api_client_mock') + + def mock_keep_services(self, api_mock=None, status=200, count=12, + service_type='disk', + service_host=None, + service_port=None, + service_ssl_flag=False, + additional_services=[]): + if api_mock is None: + api_mock = self.api_client_mock() + body = { + 'items_available': count, + 'items': [{ + 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i), + 'owner_uuid': 'zzzzz-tpzed-000000000000000', + 'service_host': service_host or 'keep0x{:x}'.format(i), + 'service_port': service_port or 65535-i, + 'service_ssl_flag': service_ssl_flag, + 'service_type': service_type, + } for i in range(0, count)] + additional_services + } + self._mock_api_call(api_mock.keep_services().accessible, status, body) + return api_mock + + def _mock_api_call(self, mock_method, code, body): + mock_method = mock_method().execute + if code == 200: + mock_method.return_value = body + else: + mock_method.side_effect = arvados.errors.ApiError( + fake_httplib2_response(code), "{}") + + class ArvadosBaseTestCase(unittest.TestCase): # This class provides common utility functions for our tests.