#!/usr/bin/env python
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import range
+from builtins import object
import arvados
+import contextlib
import errno
import hashlib
-import httplib
+import http.client
import httplib2
import io
import mock
import os
import pycurl
-import Queue
+import queue
import shutil
+import sys
import tempfile
import unittest
given, it will be consumed to fill the queue before queue_with()
returns.
"""
- queue = Queue.Queue()
+ q = queue.Queue()
for val in items:
- queue.put(val)
- return lambda *args, **kwargs: queue.get(block=False)
+ q.put(val)
+ return lambda *args, **kwargs: q.get(block=False)
# fake_httplib2_response and mock_responses
# mock calls to httplib2.Http.request()
def fake_httplib2_response(code, **headers):
headers.update(status=str(code),
- reason=httplib.responses.get(code, "Unknown Response"))
+ reason=http.client.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
def mock_responses(body, *codes, **headers):
def str_keep_locator(s):
return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
-class FakeCurl:
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
+class FakeCurl(object):
@classmethod
def make(cls, code, body='', headers={}):
return mock.Mock(spec=cls, wraps=cls(code, body, headers))
raise ValueError
if self._headerfunction:
self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
- for k, v in self._resp_headers.iteritems():
+ for k, v in self._resp_headers.items():
self._headerfunction(k + ': ' + str(v))
if type(self._resp_body) is not bool:
self._writer(self._resp_body)