X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3993ba462c113090bc2db79db30838c017a5aa3b..e158f485053be1e840073b321033d60d686a55a8:/sdk/python/tests/arvados_testutil.py diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py index d7d20e8891..04ca6b5e10 100644 --- a/sdk/python/tests/arvados_testutil.py +++ b/sdk/python/tests/arvados_testutil.py @@ -1,15 +1,16 @@ #!/usr/bin/env python import errno +import hashlib import httplib import httplib2 import io import mock import os +import requests import shutil import tempfile import unittest -import requests # Use this hostname when you want to make sure the traffic will be # instantly refused. 100::/64 is a dedicated black hole. @@ -50,6 +51,21 @@ def mock_requestslib_responses(method, body, *codes, **headers): return mock.patch(method, side_effect=( fake_requests_response(code, body, **headers) for code in codes)) +class MockStreamReader(object): + def __init__(self, name='.', *data): + self._name = name + self._data = ''.join(data) + self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(), + len(d)) for d in data] + self.num_retries = 0 + + def name(self): + return self._name + + def readfrom(self, start, size, num_retries=None): + return self._data[start:start + size] + + class ArvadosBaseTestCase(unittest.TestCase): # This class provides common utility functions for our tests.