-#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import range
+from builtins import object
import arvados
+import contextlib
import errno
import hashlib
-import httplib
+import http.client
import httplib2
import io
import mock
import os
-import Queue
-import requests
+import pycurl
+import queue
import shutil
+import sys
import tempfile
import unittest
+if sys.version_info >= (3, 0):
+ from io import StringIO, BytesIO
+else:
+ from cStringIO import StringIO
+ BytesIO = StringIO
+
# Use this hostname when you want to make sure the traffic will be
# instantly refused. 100::/64 is a dedicated black hole.
TEST_HOST = '100::'
given, it will be consumed to fill the queue before queue_with()
returns.
"""
- queue = Queue.Queue()
+ q = queue.Queue()
for val in items:
- queue.put(val)
- return lambda *args, **kwargs: queue.get(block=False)
+ q.put(val)
+ return lambda *args, **kwargs: q.get(block=False)
# fake_httplib2_response and mock_responses
# mock calls to httplib2.Http.request()
def fake_httplib2_response(code, **headers):
headers.update(status=str(code),
- reason=httplib.responses.get(code, "Unknown Response"))
+ reason=http.client.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
def mock_responses(body, *codes, **headers):
+ if not isinstance(body, bytes) and hasattr(body, 'encode'):
+ body = body.encode()
return mock.patch('httplib2.Http.request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
-# fake_requests_response, mock_get_responses and mock_put_responses
-# mock calls to requests.get() and requests.put()
-def fake_requests_response(code, body, **headers):
- r = requests.Response()
- r.status_code = code
- r.reason = httplib.responses.get(code, "Unknown Response")
- r.headers = headers
- r.raw = io.BytesIO(body)
- return r
-
-# The following methods patch requests.Session(), where return_value is a mock
-# Session object. The put/get attributes are set on mock Session, and the
-# desired put/get behavior is set on the put/get mocks.
-
-def mock_put_responses(body, *codes, **headers):
- m = mock.MagicMock()
+def mock_api_responses(api_client, body, codes, headers={}):
+ if not isinstance(body, bytes) and hasattr(body, 'encode'):
+ body = body.encode()
+ return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
+ (fake_httplib2_response(code, **headers), body) for code in codes)))
+
+def str_keep_locator(s):
+ return '{}+{}'.format(hashlib.md5(s if isinstance(s, bytes) else s.encode()).hexdigest(), len(s))
+
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ if stdout == StringIO:
+ stdout = StringIO()
+ if stderr == StringIO:
+ stderr = StringIO()
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield (stdout, stderr)
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
+class VersionChecker(object):
+ def assertVersionOutput(self, out, err):
+ if sys.version_info >= (3, 0):
+ self.assertEqual(err.getvalue(), '')
+ v = out.getvalue()
+ else:
+ # Python 2 writes version info on stderr.
+ self.assertEqual(out.getvalue(), '')
+ v = err.getvalue()
+ self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+(\.dev[0-9]+)?$\n")
+
+
+class FakeCurl(object):
+ @classmethod
+ def make(cls, code, body=b'', headers={}):
+ if not isinstance(body, bytes) and hasattr(body, 'encode'):
+ body = body.encode()
+ return mock.Mock(spec=cls, wraps=cls(code, body, headers))
+
+ def __init__(self, code=200, body=b'', headers={}):
+ self._opt = {}
+ self._got_url = None
+ self._writer = None
+ self._headerfunction = None
+ self._resp_code = code
+ self._resp_body = body
+ self._resp_headers = headers
+
+ def getopt(self, opt):
+ return self._opt.get(str(opt), None)
+
+ def setopt(self, opt, val):
+ self._opt[str(opt)] = val
+ if opt == pycurl.WRITEFUNCTION:
+ self._writer = val
+ elif opt == pycurl.HEADERFUNCTION:
+ self._headerfunction = val
+
+ def perform(self):
+ if not isinstance(self._resp_code, int):
+ raise self._resp_code
+ if self.getopt(pycurl.URL) is None:
+ raise ValueError
+ if self._writer is None:
+ raise ValueError
+ if self._headerfunction:
+ self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
+ for k, v in self._resp_headers.items():
+ self._headerfunction(k + ': ' + str(v))
+ if type(self._resp_body) is not bool:
+ self._writer(self._resp_body)
+
+ def close(self):
+ pass
+
+ def reset(self):
+ """Prevent fake UAs from going back into the user agent pool."""
+ raise Exception
+
+ def getinfo(self, opt):
+ if opt == pycurl.RESPONSE_CODE:
+ return self._resp_code
+ raise Exception
+
+def mock_keep_responses(body, *codes, **headers):
+ """Patch pycurl to return fake responses and raise exceptions.
+
+ body can be a string to return as the response body; an exception
+ to raise when perform() is called; or an iterable that returns a
+ sequence of such values.
+ """
+ cm = mock.MagicMock()
if isinstance(body, tuple):
codes = list(codes)
codes.insert(0, body)
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, b, **headers) for b, code in codes))
+ responses = [
+ FakeCurl.make(code=code, body=b, headers=headers)
+ for b, code in codes
+ ]
else:
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
+ responses = [
+ FakeCurl.make(code=code, body=body, headers=headers)
+ for code in codes
+ ]
+ cm.side_effect = queue_with(responses)
+ cm.responses = responses
+ return mock.patch('pycurl.Curl', cm)
-def mock_get_responses(body, *codes, **headers):
- m = mock.MagicMock()
- m.return_value.get.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
-
-def mock_get(side_effect):
- m = mock.MagicMock()
- m.return_value.get.side_effect = side_effect
- return mock.patch('requests.Session', m)
-
-def mock_put(side_effect):
- m = mock.MagicMock()
- m.return_value.put.side_effect = side_effect
- return mock.patch('requests.Session', m)
class MockStreamReader(object):
def __init__(self, name='.', *data):
self._name = name
- self._data = ''.join(data)
- self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
- len(d)) for d in data]
+ self._data = b''.join([
+ b if isinstance(b, bytes) else b.encode()
+ for b in data])
+ self._data_locators = [str_keep_locator(d) for d in data]
self.num_retries = 0
def name(self):
return self._name
def readfrom(self, start, size, num_retries=None):
- self._readfrom(start, size, num_retries=num_retries)
-
- def _readfrom(self, start, size, num_retries=None):
return self._data[start:start + size]
class ApiClientMock(object):
def api_client_mock(self):
- return mock.MagicMock(name='api_client_mock')
+ api_mock = mock.MagicMock(name='api_client_mock')
+ api_mock.config.return_value = {
+ 'StorageClasses': {
+ 'default': {'Default': True}
+ }
+ }
+ return api_mock
def mock_keep_services(self, api_mock=None, status=200, count=12,
service_type='disk',
service_host=None,
service_port=None,
- service_ssl_flag=False):
+ service_ssl_flag=False,
+ additional_services=[],
+ read_only=False):
if api_mock is None:
api_mock = self.api_client_mock()
body = {
'service_port': service_port or 65535-i,
'service_ssl_flag': service_ssl_flag,
'service_type': service_type,
- } for i in range(0, count)]
+ 'read_only': read_only,
+ } for i in range(0, count)] + additional_services
}
self._mock_api_call(api_mock.keep_services().accessible, status, body)
return api_mock
mock_method.return_value = body
else:
mock_method.side_effect = arvados.errors.ApiError(
- fake_httplib2_response(code), "{}")
+ fake_httplib2_response(code), b"{}")
class ArvadosBaseTestCase(unittest.TestCase):
tmpfile.write(leaf)
return tree_root
- def make_test_file(self, text="test"):
+ def make_test_file(self, text=b"test"):
testfile = tempfile.NamedTemporaryFile()
testfile.write(text)
testfile.flush()
return testfile
+
+if sys.version_info < (3, 0):
+ # There is no assert[Not]Regex that works in both Python 2 and 3,
+ # so we backport Python 3 style to Python 2.
+ def assertRegex(self, *args, **kwargs):
+ return self.assertRegexpMatches(*args, **kwargs)
+ def assertNotRegex(self, *args, **kwargs):
+ return self.assertNotRegexpMatches(*args, **kwargs)
+ unittest.TestCase.assertRegex = assertRegex
+ unittest.TestCase.assertNotRegex = assertNotRegex