#!/usr/bin/env python
import arvados
+import contextlib
import errno
import hashlib
import httplib
import pycurl
import Queue
import shutil
+import sys
import tempfile
import unittest
def str_keep_locator(s):
return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class FakeCurl:
@classmethod
def make(cls, code, body='', headers={}):
self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
for k, v in self._resp_headers.iteritems():
self._headerfunction(k + ': ' + str(v))
- self._writer(self._resp_body)
+ if type(self._resp_body) is not bool:
+ self._writer(self._resp_body)
def close(self):
pass