11308: Fix variable name conflict.
[arvados.git] / sdk / python / tests / arvados_testutil.py
1 #!/usr/bin/env python
2
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 from builtins import object
8 import arvados
9 import contextlib
10 import errno
11 import hashlib
12 import http.client
13 import httplib2
14 import io
15 import mock
16 import os
17 import pycurl
18 import queue
19 import shutil
20 import sys
21 import tempfile
22 import unittest
23
24 # Use this hostname when you want to make sure the traffic will be
25 # instantly refused.  100::/64 is a dedicated black hole.
26 TEST_HOST = '100::'
27
28 skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
29
30 def queue_with(items):
31     """Return a thread-safe iterator that yields the given items.
32
33     +items+ can be given as an array or an iterator. If an iterator is
34     given, it will be consumed to fill the queue before queue_with()
35     returns.
36     """
37     q = queue.Queue()
38     for val in items:
39         q.put(val)
40     return lambda *args, **kwargs: q.get(block=False)
41
42 # fake_httplib2_response and mock_responses
43 # mock calls to httplib2.Http.request()
44 def fake_httplib2_response(code, **headers):
45     headers.update(status=str(code),
46                    reason=http.client.responses.get(code, "Unknown Response"))
47     return httplib2.Response(headers)
48
49 def mock_responses(body, *codes, **headers):
50     return mock.patch('httplib2.Http.request', side_effect=queue_with((
51         (fake_httplib2_response(code, **headers), body) for code in codes)))
52
53 def mock_api_responses(api_client, body, codes, headers={}):
54     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
55         (fake_httplib2_response(code, **headers), body) for code in codes)))
56
57 def str_keep_locator(s):
58     return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
59
60 @contextlib.contextmanager
61 def redirected_streams(stdout=None, stderr=None):
62     orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
63     orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
64     try:
65         yield
66     finally:
67         sys.stdout = orig_stdout
68         sys.stderr = orig_stderr
69
70
71 class FakeCurl(object):
72     @classmethod
73     def make(cls, code, body='', headers={}):
74         return mock.Mock(spec=cls, wraps=cls(code, body, headers))
75
76     def __init__(self, code=200, body='', headers={}):
77         self._opt = {}
78         self._got_url = None
79         self._writer = None
80         self._headerfunction = None
81         self._resp_code = code
82         self._resp_body = body
83         self._resp_headers = headers
84
85     def getopt(self, opt):
86         return self._opt.get(str(opt), None)
87
88     def setopt(self, opt, val):
89         self._opt[str(opt)] = val
90         if opt == pycurl.WRITEFUNCTION:
91             self._writer = val
92         elif opt == pycurl.HEADERFUNCTION:
93             self._headerfunction = val
94
95     def perform(self):
96         if not isinstance(self._resp_code, int):
97             raise self._resp_code
98         if self.getopt(pycurl.URL) is None:
99             raise ValueError
100         if self._writer is None:
101             raise ValueError
102         if self._headerfunction:
103             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
104             for k, v in self._resp_headers.items():
105                 self._headerfunction(k + ': ' + str(v))
106         if type(self._resp_body) is not bool:
107             self._writer(self._resp_body)
108
109     def close(self):
110         pass
111
112     def reset(self):
113         """Prevent fake UAs from going back into the user agent pool."""
114         raise Exception
115
116     def getinfo(self, opt):
117         if opt == pycurl.RESPONSE_CODE:
118             return self._resp_code
119         raise Exception
120
121 def mock_keep_responses(body, *codes, **headers):
122     """Patch pycurl to return fake responses and raise exceptions.
123
124     body can be a string to return as the response body; an exception
125     to raise when perform() is called; or an iterable that returns a
126     sequence of such values.
127     """
128     cm = mock.MagicMock()
129     if isinstance(body, tuple):
130         codes = list(codes)
131         codes.insert(0, body)
132         responses = [
133             FakeCurl.make(code=code, body=b, headers=headers)
134             for b, code in codes
135         ]
136     else:
137         responses = [
138             FakeCurl.make(code=code, body=body, headers=headers)
139             for code in codes
140         ]
141     cm.side_effect = queue_with(responses)
142     cm.responses = responses
143     return mock.patch('pycurl.Curl', cm)
144
145
146 class MockStreamReader(object):
147     def __init__(self, name='.', *data):
148         self._name = name
149         self._data = ''.join(data)
150         self._data_locators = [str_keep_locator(d) for d in data]
151         self.num_retries = 0
152
153     def name(self):
154         return self._name
155
156     def readfrom(self, start, size, num_retries=None):
157         return self._data[start:start + size]
158
159 class ApiClientMock(object):
160     def api_client_mock(self):
161         return mock.MagicMock(name='api_client_mock')
162
163     def mock_keep_services(self, api_mock=None, status=200, count=12,
164                            service_type='disk',
165                            service_host=None,
166                            service_port=None,
167                            service_ssl_flag=False,
168                            additional_services=[],
169                            read_only=False):
170         if api_mock is None:
171             api_mock = self.api_client_mock()
172         body = {
173             'items_available': count,
174             'items': [{
175                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
176                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
177                 'service_host': service_host or 'keep0x{:x}'.format(i),
178                 'service_port': service_port or 65535-i,
179                 'service_ssl_flag': service_ssl_flag,
180                 'service_type': service_type,
181                 'read_only': read_only,
182             } for i in range(0, count)] + additional_services
183         }
184         self._mock_api_call(api_mock.keep_services().accessible, status, body)
185         return api_mock
186
187     def _mock_api_call(self, mock_method, code, body):
188         mock_method = mock_method().execute
189         if code == 200:
190             mock_method.return_value = body
191         else:
192             mock_method.side_effect = arvados.errors.ApiError(
193                 fake_httplib2_response(code), "{}")
194
195
196 class ArvadosBaseTestCase(unittest.TestCase):
197     # This class provides common utility functions for our tests.
198
199     def setUp(self):
200         self._tempdirs = []
201
202     def tearDown(self):
203         for workdir in self._tempdirs:
204             shutil.rmtree(workdir, ignore_errors=True)
205
206     def make_tmpdir(self):
207         self._tempdirs.append(tempfile.mkdtemp())
208         return self._tempdirs[-1]
209
210     def data_file(self, filename):
211         try:
212             basedir = os.path.dirname(__file__)
213         except NameError:
214             basedir = '.'
215         return open(os.path.join(basedir, 'data', filename))
216
217     def build_directory_tree(self, tree):
218         tree_root = self.make_tmpdir()
219         for leaf in tree:
220             path = os.path.join(tree_root, leaf)
221             try:
222                 os.makedirs(os.path.dirname(path))
223             except OSError as error:
224                 if error.errno != errno.EEXIST:
225                     raise
226             with open(path, 'w') as tmpfile:
227                 tmpfile.write(leaf)
228         return tree_root
229
230     def make_test_file(self, text="test"):
231         testfile = tempfile.NamedTemporaryFile()
232         testfile.write(text)
233         testfile.flush()
234         return testfile