8937: updated arvados_testutil.py to skip setting resp_body to writer when it is...
[arvados.git] / sdk / python / tests / arvados_testutil.py
1 #!/usr/bin/env python
2
3 import arvados
4 import errno
5 import hashlib
6 import httplib
7 import httplib2
8 import io
9 import mock
10 import os
11 import pycurl
12 import Queue
13 import shutil
14 import tempfile
15 import unittest
16
17 # Use this hostname when you want to make sure the traffic will be
18 # instantly refused.  100::/64 is a dedicated black hole.
19 TEST_HOST = '100::'
20
21 skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
22
23 def queue_with(items):
24     """Return a thread-safe iterator that yields the given items.
25
26     +items+ can be given as an array or an iterator. If an iterator is
27     given, it will be consumed to fill the queue before queue_with()
28     returns.
29     """
30     queue = Queue.Queue()
31     for val in items:
32         queue.put(val)
33     return lambda *args, **kwargs: queue.get(block=False)
34
35 # fake_httplib2_response and mock_responses
36 # mock calls to httplib2.Http.request()
37 def fake_httplib2_response(code, **headers):
38     headers.update(status=str(code),
39                    reason=httplib.responses.get(code, "Unknown Response"))
40     return httplib2.Response(headers)
41
42 def mock_responses(body, *codes, **headers):
43     return mock.patch('httplib2.Http.request', side_effect=queue_with((
44         (fake_httplib2_response(code, **headers), body) for code in codes)))
45
46 def mock_api_responses(api_client, body, codes, headers={}):
47     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
48         (fake_httplib2_response(code, **headers), body) for code in codes)))
49
50 def str_keep_locator(s):
51     return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
52
53 class FakeCurl:
54     @classmethod
55     def make(cls, code, body='', headers={}):
56         return mock.Mock(spec=cls, wraps=cls(code, body, headers))
57
58     def __init__(self, code=200, body='', headers={}):
59         self._opt = {}
60         self._got_url = None
61         self._writer = None
62         self._headerfunction = None
63         self._resp_code = code
64         self._resp_body = body
65         self._resp_headers = headers
66
67     def getopt(self, opt):
68         return self._opt.get(str(opt), None)
69
70     def setopt(self, opt, val):
71         self._opt[str(opt)] = val
72         if opt == pycurl.WRITEFUNCTION:
73             self._writer = val
74         elif opt == pycurl.HEADERFUNCTION:
75             self._headerfunction = val
76
77     def perform(self):
78         if not isinstance(self._resp_code, int):
79             raise self._resp_code
80         if self.getopt(pycurl.URL) is None:
81             raise ValueError
82         if self._writer is None:
83             raise ValueError
84         if self._headerfunction:
85             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
86             for k, v in self._resp_headers.iteritems():
87                 self._headerfunction(k + ': ' + str(v))
88         if type(self._resp_body) is not bool:
89             self._writer(self._resp_body)
90
91     def close(self):
92         pass
93
94     def reset(self):
95         """Prevent fake UAs from going back into the user agent pool."""
96         raise Exception
97
98     def getinfo(self, opt):
99         if opt == pycurl.RESPONSE_CODE:
100             return self._resp_code
101         raise Exception
102
103 def mock_keep_responses(body, *codes, **headers):
104     """Patch pycurl to return fake responses and raise exceptions.
105
106     body can be a string to return as the response body; an exception
107     to raise when perform() is called; or an iterable that returns a
108     sequence of such values.
109     """
110     cm = mock.MagicMock()
111     if isinstance(body, tuple):
112         codes = list(codes)
113         codes.insert(0, body)
114         responses = [
115             FakeCurl.make(code=code, body=b, headers=headers)
116             for b, code in codes
117         ]
118     else:
119         responses = [
120             FakeCurl.make(code=code, body=body, headers=headers)
121             for code in codes
122         ]
123     cm.side_effect = queue_with(responses)
124     cm.responses = responses
125     return mock.patch('pycurl.Curl', cm)
126
127
128 class MockStreamReader(object):
129     def __init__(self, name='.', *data):
130         self._name = name
131         self._data = ''.join(data)
132         self._data_locators = [str_keep_locator(d) for d in data]
133         self.num_retries = 0
134
135     def name(self):
136         return self._name
137
138     def readfrom(self, start, size, num_retries=None):
139         return self._data[start:start + size]
140
141 class ApiClientMock(object):
142     def api_client_mock(self):
143         return mock.MagicMock(name='api_client_mock')
144
145     def mock_keep_services(self, api_mock=None, status=200, count=12,
146                            service_type='disk',
147                            service_host=None,
148                            service_port=None,
149                            service_ssl_flag=False,
150                            additional_services=[],
151                            read_only=False):
152         if api_mock is None:
153             api_mock = self.api_client_mock()
154         body = {
155             'items_available': count,
156             'items': [{
157                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
158                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
159                 'service_host': service_host or 'keep0x{:x}'.format(i),
160                 'service_port': service_port or 65535-i,
161                 'service_ssl_flag': service_ssl_flag,
162                 'service_type': service_type,
163                 'read_only': read_only,
164             } for i in range(0, count)] + additional_services
165         }
166         self._mock_api_call(api_mock.keep_services().accessible, status, body)
167         return api_mock
168
169     def _mock_api_call(self, mock_method, code, body):
170         mock_method = mock_method().execute
171         if code == 200:
172             mock_method.return_value = body
173         else:
174             mock_method.side_effect = arvados.errors.ApiError(
175                 fake_httplib2_response(code), "{}")
176
177
178 class ArvadosBaseTestCase(unittest.TestCase):
179     # This class provides common utility functions for our tests.
180
181     def setUp(self):
182         self._tempdirs = []
183
184     def tearDown(self):
185         for workdir in self._tempdirs:
186             shutil.rmtree(workdir, ignore_errors=True)
187
188     def make_tmpdir(self):
189         self._tempdirs.append(tempfile.mkdtemp())
190         return self._tempdirs[-1]
191
192     def data_file(self, filename):
193         try:
194             basedir = os.path.dirname(__file__)
195         except NameError:
196             basedir = '.'
197         return open(os.path.join(basedir, 'data', filename))
198
199     def build_directory_tree(self, tree):
200         tree_root = self.make_tmpdir()
201         for leaf in tree:
202             path = os.path.join(tree_root, leaf)
203             try:
204                 os.makedirs(os.path.dirname(path))
205             except OSError as error:
206                 if error.errno != errno.EEXIST:
207                     raise
208             with open(path, 'w') as tmpfile:
209                 tmpfile.write(leaf)
210         return tree_root
211
212     def make_test_file(self, text="test"):
213         testfile = tempfile.NamedTemporaryFile()
214         testfile.write(text)
215         testfile.flush()
216         return testfile