#!/usr/bin/env python
import errno
+import hashlib
import httplib
import httplib2
import io
r.raw = io.BytesIO(body)
return r
-def mock_get_responses(body, *codes, **headers):
- return mock.patch('requests.get', side_effect=(
- fake_requests_response(code, body, **headers) for code in codes))
-
def mock_put_responses(body, *codes, **headers):
- return mock.patch('requests.put', side_effect=(
- fake_requests_response(code, body, **headers) for code in codes))
+ m = mock.MagicMock()
+ if isinstance(body, tuple):
+ codes = list(codes)
+ codes.insert(0, body)
+ m.return_value.put.side_effect = (fake_requests_response(code, b, **headers) for b, code in codes)
+ else:
+ m.return_value.put.side_effect = (fake_requests_response(code, body, **headers) for code in codes)
+ return mock.patch('requests.Session', m)
-def mock_requestslib_responses(method, body, *codes, **headers):
- return mock.patch(method, side_effect=(
- fake_requests_response(code, body, **headers) for code in codes))
+def mock_get_responses(body, *codes, **headers):
+ m = mock.MagicMock()
+ m.return_value.get.side_effect = (fake_requests_response(code, body, **headers) for code in codes)
+ return mock.patch('requests.Session', m)
+
+def mock_get(side_effect):
+ m = mock.MagicMock()
+ m.return_value.get.side_effect = side_effect
+ return mock.patch('requests.Session', m)
+
+def mock_put(side_effect):
+ m = mock.MagicMock()
+ m.return_value.put.side_effect = side_effect
+ return mock.patch('requests.Session', m)
+
+class MockStreamReader(object):
+ def __init__(self, name='.', *data):
+ self._name = name
+ self._data = ''.join(data)
+ self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
+ len(d)) for d in data]
+ self.num_retries = 0
+
+ def name(self):
+ return self._name
+
+ def readfrom(self, start, size, num_retries=None):
+ self._readfrom(start, size, num_retries=num_retries)
+
+ def _readfrom(self, start, size, num_retries=None):
+ return self._data[start:start + size]
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.