19 # Use this hostname when you want to make sure the traffic will be
20 # instantly refused. 100::/64 is a dedicated black hole.
23 skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me
25 def queue_with(items):
26 """Return a thread-safe iterator that yields the given items.
28 +items+ can be given as an array or an iterator. If an iterator is
29 given, it will be consumed to fill the queue before queue_with()
35 return lambda *args, **kwargs: queue.get(block=False)
37 # fake_httplib2_response and mock_responses
38 # mock calls to httplib2.Http.request()
39 def fake_httplib2_response(code, **headers):
40 headers.update(status=str(code),
41 reason=httplib.responses.get(code, "Unknown Response"))
42 return httplib2.Response(headers)
44 def mock_responses(body, *codes, **headers):
45 return mock.patch('httplib2.Http.request', side_effect=queue_with((
46 (fake_httplib2_response(code, **headers), body) for code in codes)))
48 def mock_api_responses(api_client, body, codes, headers={}):
49 return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
50 (fake_httplib2_response(code, **headers), body) for code in codes)))
52 def str_keep_locator(s):
53 return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
55 @contextlib.contextmanager
56 def redirected_streams(stdout=None, stderr=None):
57 orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
58 orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
62 sys.stdout = orig_stdout
63 sys.stderr = orig_stderr
68 def make(cls, code, body='', headers={}):
69 return mock.Mock(spec=cls, wraps=cls(code, body, headers))
71 def __init__(self, code=200, body='', headers={}):
75 self._headerfunction = None
76 self._resp_code = code
77 self._resp_body = body
78 self._resp_headers = headers
80 def getopt(self, opt):
81 return self._opt.get(str(opt), None)
83 def setopt(self, opt, val):
84 self._opt[str(opt)] = val
85 if opt == pycurl.WRITEFUNCTION:
87 elif opt == pycurl.HEADERFUNCTION:
88 self._headerfunction = val
91 if not isinstance(self._resp_code, int):
93 if self.getopt(pycurl.URL) is None:
95 if self._writer is None:
97 if self._headerfunction:
98 self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
99 for k, v in self._resp_headers.iteritems():
100 self._headerfunction(k + ': ' + str(v))
101 if type(self._resp_body) is not bool:
102 self._writer(self._resp_body)
108 """Prevent fake UAs from going back into the user agent pool."""
111 def getinfo(self, opt):
112 if opt == pycurl.RESPONSE_CODE:
113 return self._resp_code
116 def mock_keep_responses(body, *codes, **headers):
117 """Patch pycurl to return fake responses and raise exceptions.
119 body can be a string to return as the response body; an exception
120 to raise when perform() is called; or an iterable that returns a
121 sequence of such values.
123 cm = mock.MagicMock()
124 if isinstance(body, tuple):
126 codes.insert(0, body)
128 FakeCurl.make(code=code, body=b, headers=headers)
133 FakeCurl.make(code=code, body=body, headers=headers)
136 cm.side_effect = queue_with(responses)
137 cm.responses = responses
138 return mock.patch('pycurl.Curl', cm)
141 class MockStreamReader(object):
142 def __init__(self, name='.', *data):
144 self._data = ''.join(data)
145 self._data_locators = [str_keep_locator(d) for d in data]
151 def readfrom(self, start, size, num_retries=None):
152 return self._data[start:start + size]
154 class ApiClientMock(object):
155 def api_client_mock(self):
156 return mock.MagicMock(name='api_client_mock')
158 def mock_keep_services(self, api_mock=None, status=200, count=12,
162 service_ssl_flag=False,
163 additional_services=[],
166 api_mock = self.api_client_mock()
168 'items_available': count,
170 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
171 'owner_uuid': 'zzzzz-tpzed-000000000000000',
172 'service_host': service_host or 'keep0x{:x}'.format(i),
173 'service_port': service_port or 65535-i,
174 'service_ssl_flag': service_ssl_flag,
175 'service_type': service_type,
176 'read_only': read_only,
177 } for i in range(0, count)] + additional_services
179 self._mock_api_call(api_mock.keep_services().accessible, status, body)
182 def _mock_api_call(self, mock_method, code, body):
183 mock_method = mock_method().execute
185 mock_method.return_value = body
187 mock_method.side_effect = arvados.errors.ApiError(
188 fake_httplib2_response(code), "{}")
191 class ArvadosBaseTestCase(unittest.TestCase):
192 # This class provides common utility functions for our tests.
198 for workdir in self._tempdirs:
199 shutil.rmtree(workdir, ignore_errors=True)
201 def make_tmpdir(self):
202 self._tempdirs.append(tempfile.mkdtemp())
203 return self._tempdirs[-1]
205 def data_file(self, filename):
207 basedir = os.path.dirname(__file__)
210 return open(os.path.join(basedir, 'data', filename))
212 def build_directory_tree(self, tree):
213 tree_root = self.make_tmpdir()
215 path = os.path.join(tree_root, leaf)
217 os.makedirs(os.path.dirname(path))
218 except OSError as error:
219 if error.errno != errno.EEXIST:
221 with open(path, 'w') as tmpfile:
225 def make_test_file(self, text="test"):
226 testfile = tempfile.NamedTemporaryFile()