#!/usr/bin/env python
+import arvados
import errno
+import hashlib
import httplib
import httplib2
+import io
import mock
import os
+import pycurl
+import Queue
import shutil
import tempfile
import unittest
skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me
+def queue_with(items):
+ """Return a thread-safe iterator that yields the given items.
+
+ +items+ can be given as an array or an iterator. If an iterator is
+ given, it will be consumed to fill the queue before queue_with()
+ returns.
+ """
+ queue = Queue.Queue()
+ for val in items:
+ queue.put(val)
+ return lambda *args, **kwargs: queue.get(block=False)
+
+# fake_httplib2_response and mock_responses
+# mock calls to httplib2.Http.request()
def fake_httplib2_response(code, **headers):
headers.update(status=str(code),
reason=httplib.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
-def mock_responses(body, *codes):
- return mock.patch('httplib2.Http.request', side_effect=(
- (fake_httplib2_response(code), body) for code in codes))
+def mock_responses(body, *codes, **headers):
+ return mock.patch('httplib2.Http.request', side_effect=queue_with((
+ (fake_httplib2_response(code, **headers), body) for code in codes)))
+
+
+class FakeCurl:
+ @classmethod
+ def make(cls, code, body='', headers={}):
+ return mock.Mock(spec=cls, wraps=cls(code, body, headers))
+
+ def __init__(self, code=200, body='', headers={}):
+ self._opt = {}
+ self._got_url = None
+ self._writer = None
+ self._headerfunction = None
+ self._resp_code = code
+ self._resp_body = body
+ self._resp_headers = headers
+
+ def getopt(self, opt):
+ return self._opt.get(str(opt), None)
+
+ def setopt(self, opt, val):
+ self._opt[str(opt)] = val
+ if opt == pycurl.WRITEFUNCTION:
+ self._writer = val
+ elif opt == pycurl.HEADERFUNCTION:
+ self._headerfunction = val
+
+ def perform(self):
+ if not isinstance(self._resp_code, int):
+ raise self._resp_code
+ if self.getopt(pycurl.URL) is None:
+ raise ValueError
+ if self._writer is None:
+ raise ValueError
+ if self._headerfunction:
+ self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
+ for k, v in self._resp_headers.iteritems():
+ self._headerfunction(k + ': ' + str(v))
+ self._writer(self._resp_body)
+
+ def close(self):
+ pass
+
+ def reset(self):
+ """Prevent fake UAs from going back into the user agent pool."""
+ raise Exception
+
+ def getinfo(self, opt):
+ if opt == pycurl.RESPONSE_CODE:
+ return self._resp_code
+ raise Exception
+
+def mock_keep_responses(body, *codes, **headers):
+ """Patch pycurl to return fake responses and raise exceptions.
+
+ body can be a string to return as the response body; an exception
+ to raise when perform() is called; or an iterable that returns a
+ sequence of such values.
+ """
+ cm = mock.MagicMock()
+ if isinstance(body, tuple):
+ codes = list(codes)
+ codes.insert(0, body)
+ responses = [
+ FakeCurl.make(code=code, body=b, headers=headers)
+ for b, code in codes
+ ]
+ else:
+ responses = [
+ FakeCurl.make(code=code, body=body, headers=headers)
+ for code in codes
+ ]
+ cm.side_effect = queue_with(responses)
+ cm.responses = responses
+ return mock.patch('pycurl.Curl', cm)
+
+
+class MockStreamReader(object):
+ def __init__(self, name='.', *data):
+ self._name = name
+ self._data = ''.join(data)
+ self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
+ len(d)) for d in data]
+ self.num_retries = 0
+
+ def name(self):
+ return self._name
+
+ def readfrom(self, start, size, num_retries=None):
+ return self._data[start:start + size]
+
+class ApiClientMock(object):
+ def api_client_mock(self):
+ return mock.MagicMock(name='api_client_mock')
+
+ def mock_keep_services(self, api_mock=None, status=200, count=12,
+ service_type='disk',
+ service_host=None,
+ service_port=None,
+ service_ssl_flag=False,
+ additional_services=[],
+ read_only=False):
+ if api_mock is None:
+ api_mock = self.api_client_mock()
+ body = {
+ 'items_available': count,
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
+ 'owner_uuid': 'zzzzz-tpzed-000000000000000',
+ 'service_host': service_host or 'keep0x{:x}'.format(i),
+ 'service_port': service_port or 65535-i,
+ 'service_ssl_flag': service_ssl_flag,
+ 'service_type': service_type,
+ 'read_only': read_only,
+ } for i in range(0, count)] + additional_services
+ }
+ self._mock_api_call(api_mock.keep_services().accessible, status, body)
+ return api_mock
+
+ def _mock_api_call(self, mock_method, code, body):
+ mock_method = mock_method().execute
+ if code == 200:
+ mock_method.return_value = body
+ else:
+ mock_method.side_effect = arvados.errors.ApiError(
+ fake_httplib2_response(code), "{}")
+
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.