5562: Use pycurl library (instead of requests) for Keep transactions.
authorTom Clegg <tom@curoverse.com>
Wed, 22 Apr 2015 18:26:43 +0000 (14:26 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 29 Apr 2015 04:39:29 +0000 (00:39 -0400)
sdk/python/arvados/errors.py
sdk/python/arvados/keep.py
sdk/python/arvados/retry.py
sdk/python/setup.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_errors.py
sdk/python/tests/test_keep_client.py
sdk/python/tests/test_retry.py
sdk/python/tests/test_stream.py

index 3629520a4d5f7216a86a5b085585e4f0756b3581..bfd471ba52bee712a1e1768c91327ce28a9c6603 100644 (file)
@@ -1,7 +1,6 @@
 # errors.py - Arvados-specific exceptions.
 
 import json
-import requests
 
 from apiclient import errors as apiclient_errors
 from collections import OrderedDict
@@ -46,7 +45,7 @@ class KeepRequestError(Exception):
         self.message = message
 
     def _format_error(self, key, error):
-        if isinstance(error, requests.Response):
+        if isinstance(error, HttpError):
             err_fmt = "{} {} responded with {e.status_code} {e.reason}"
         else:
             err_fmt = "{} {} raised {e.__class__.__name__} ({e})"
@@ -61,6 +60,12 @@ class KeepRequestError(Exception):
         return self._request_errors
 
 
+class HttpError(Exception):
+    def __init__(self, status_code, reason):
+        self.status_code = status_code
+        self.reason = reason
+
+
 class ArgumentError(Exception):
     pass
 class SyntaxError(Exception):
index 842a36d8ed6145062f166347f4b79126eadce196..b26285e8fc6f3f54f26bba340f2ba9b85f977d2a 100644 (file)
@@ -1,25 +1,28 @@
+import bz2
+import datetime
+import fcntl
+import functools
 import gflags
+import hashlib
+import json
 import logging
 import os
 import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
 import re
-import hashlib
+import socket
+import ssl
 import string
-import bz2
-import zlib
-import fcntl
-import time
+import StringIO
+import subprocess
+import sys
 import threading
+import time
 import timer
-import datetime
-import ssl
-import socket
-import requests
+import types
+import UserDict
+import zlib
 
 import arvados
 import arvados.config as config
@@ -27,25 +30,10 @@ import arvados.errors
 import arvados.retry as retry
 import arvados.util
 
-try:
-    # Workaround for urllib3 bug.
-    # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
-    # However, urllib3 prior to version 1.10 has a major bug in this feature
-    # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
-    # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
-    # following workaround is necessary to be able to use
-    # the arvados python sdk with the distribution-provided packages.
-    import urllib3
-    from pkg_resources import parse_version
-    if parse_version(urllib3.__version__) < parse_version('1.10'):
-        from urllib3.contrib import pyopenssl
-        pyopenssl.extract_from_urllib3()
-except ImportError:
-    pass
-
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -301,75 +289,189 @@ class KeepClient(object):
 
 
     class KeepService(object):
-        # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (requests.exceptions.RequestException,
-                       socket.error, ssl.SSLError)
+        """Make requests to a single Keep service, and track results.
+
+        A KeepService is intended to last long enough to perform one
+        transaction (GET or PUT) against one Keep service. This can
+        involve calling either get() or put() multiple times in order
+        to retry after transient failures. However, calling both get()
+        and put() on a single instance -- or using the same instance
+        to access two different Keep services -- will not produce
+        sensible behavior.
+        """
 
-        def __init__(self, root, session, **headers):
+        HTTP_ERRORS = (
+            socket.error,
+            ssl.SSLError,
+            arvados.errors.HttpError,
+        )
+
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
             self.root = root
-            self.last_result = None
-            self.success_flag = None
-            self.session = session
+            self._user_agent_pool = user_agent_pool
+            self._result = {'error': None}
+            self._usable = True
+            self._session = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
 
         def usable(self):
-            return self.success_flag is not False
+            """Is it worth attempting a request?"""
+            return self._usable
 
         def finished(self):
-            return self.success_flag is not None
+            """Did the request succeed or encounter permanent failure?"""
+            return self._result['error'] == False or not self._usable
+
+        def last_result(self):
+            return self._result
 
-        def last_status(self):
+        def _get_user_agent(self):
             try:
-                return self.last_result.status_code
-            except AttributeError:
-                return None
+                return self._user_agent_pool.get(False)
+            except Queue.Empty:
+                return pycurl.Curl()
+
+        def _put_user_agent(self, ua):
+            try:
+                ua.reset()
+                self._user_agent_pool.put(ua, False)
+            except:
+                ua.close()
 
         def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
+            curl = self._get_user_agent()
             try:
                 with timer.Timer() as t:
-                    result = self.session.get(url.encode('utf-8'),
-                                          headers=self.get_headers,
-                                          timeout=timeout)
+                    self._headers = {}
+                    response_body = StringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEDATA, response_body)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False
+            if not ok:
                 _logger.debug("Request fail: GET %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
-            else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-                content = result.content
-                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
-                             self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
-                if self.success_flag:
-                    resp_md5 = hashlib.md5(content).hexdigest()
-                    if resp_md5 == locator.md5sum:
-                        return content
-                    _logger.warning("Checksum fail: md5(%s) = %s",
-                                    url, resp_md5)
-            return None
+                              url, type(self._result['error']), str(self._result['error']))
+                # Don't return this ua to the pool, in case it's broken.
+                curl.close()
+                return None
+            self._put_user_agent(curl)
+            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(self._result['body']),
+                         t.msecs,
+                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+            if resp_md5 != locator.md5sum:
+                _logger.warning("Checksum fail: md5(%s) = %s",
+                                url, resp_md5)
+                self._result['error'] = arvados.errors.HttpError(
+                    0, 'Checksum fail')
+                return None
+            return self._result['body']
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
+            curl = self._get_user_agent()
             try:
-                result = self.session.put(url.encode('utf-8'),
-                                      data=body,
-                                      headers=self.put_headers,
-                                      timeout=timeout)
+                self._headers = {}
+                response_body = StringIO.StringIO()
+                curl.setopt(pycurl.NOSIGNAL, 1)
+                curl.setopt(pycurl.URL, url.encode('utf-8'))
+                curl.setopt(pycurl.POSTFIELDS, body)
+                curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+                curl.setopt(pycurl.HTTPHEADER, [
+                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                curl.setopt(pycurl.WRITEDATA, response_body)
+                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                self._setcurltimeouts(curl, timeout)
+                try:
+                    curl.perform()
+                except Exception as e:
+                    raise arvados.errors.HttpError(0, str(e))
+                self._result = {
+                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                    'body': response_body.getvalue(),
+                    'headers': self._headers,
+                    'error': False,
+                }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False # still usable if ok is True or None
+            if not ok:
                 _logger.debug("Request fail: PUT %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                              url, type(self._result['error']), str(self._result['error']))
+                # Don't return this ua to the pool, in case it's broken.
+                curl.close()
+                return False
+            self._put_user_agent(curl)
+            return True
+
+        def _setcurltimeouts(self, curl, timeouts):
+            if not timeouts:
+                return
+            elif isinstance(timeouts, tuple):
+                conn_t, xfer_t = timeouts
+            else:
+                conn_t, xfer_t = (timeouts, timeouts)
+            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+        def _headerfunction(self, header_line):
+            header_line = header_line.decode('iso-8859-1')
+            if ':' in header_line:
+                name, value = header_line.split(':', 1)
+                name = name.strip().lower()
+                value = value.strip()
+            elif self._headers:
+                name = self._lastheadername
+                value = self._headers[name] + ' ' + header_line.strip()
+            elif header_line.startswith('HTTP/'):
+                name = 'x-status-line'
+                value = header_line
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-            return self.success_flag
+                _logger.error("Unexpected header line: %s", header_line)
+                return
+            self._lastheadername = name
+            self._headers[name] = value
+            # Returning None implies all bytes were written
 
 
     class KeepWriterThread(threading.Thread):
@@ -407,9 +509,8 @@ class KeepClient(object):
                 self.args['data_hash'],
                 self.args['data'],
                 timeout=self.args.get('timeout', None)))
-            status = self.service.last_status()
+            result = self.service.last_result()
             if self._success:
-                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -420,14 +521,15 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
+                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(result.content.strip(), replicas_stored)
-            elif status is not None:
+                limiter.save_response(result['body'].strip(), replicas_stored)
+            elif result.get('status_code', None):
                 _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'], status,
-                              self.service.last_result.content)
+                              self.args['data_hash'],
+                              result['status_code'],
+                              result['body'])
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -484,10 +586,6 @@ class KeepClient(object):
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
-
-        :session:
-          The requests.Session object to use for get() and put() requests.
-          Will create one if not specified.
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -506,6 +604,7 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
+        self._user_agent_pool = Queue.LifoQueue()
 
         if local_store:
             self.local_store = local_store
@@ -513,7 +612,6 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
-            self.session = session if session is not None else requests.Session()
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -568,9 +666,13 @@ class KeepClient(object):
 
             # Precompute the base URI for each service.
             for r in accessible:
-                r['_service_root'] = "{}://[{}]:{:d}/".format(
+                host = r['service_host']
+                if not host.startswith('[') and host.find(':') >= 0:
+                    # IPv6 URIs must be formatted like http://[::1]:80/...
+                    host = '[' + host + ']'
+                r['_service_root'] = "{}://{}:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
-                    r['service_host'],
+                    host,
                     r['service_port'])
 
             # Gateway services are only used when specified by UUID,
@@ -638,7 +740,8 @@ class KeepClient(object):
         local_roots = self.weighted_service_roots(locator, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, self.session, **headers)
+                roots_map[root] = self.KeepService(
+                    root, self._user_agent_pool, **headers)
         return local_roots
 
     @staticmethod
@@ -709,7 +812,10 @@ class KeepClient(object):
                                    self._gateway_services.get(hint[2:])
                                    )])
         # Map root URLs to their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+        roots_map = {
+            root: self.KeepService(root, self._user_agent_pool)
+            for root in hint_roots
+        }
 
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
@@ -750,8 +856,8 @@ class KeepClient(object):
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         not_founds = sum(1 for key in sorted_roots
-                         if roots_map[key].last_status() in {403, 404, 410})
-        service_errors = ((key, roots_map[key].last_result)
+                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result()['error'])
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
@@ -835,9 +941,9 @@ class KeepClient(object):
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
-            service_errors = ((key, roots_map[key].last_result)
+            service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in local_roots
-                              if not roots_map[key].success_flag)
+                              if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
index 52a68faa6f6b511bf78378cc944aa6e4c5914c33..e4ad6440a7130e0ff0b4891d3b516cc4a7531c9d 100644 (file)
@@ -2,6 +2,7 @@
 
 import functools
 import inspect
+import pycurl
 import time
 
 from collections import deque
@@ -109,11 +110,11 @@ class RetryLoop(object):
                 "queried loop results before any were recorded")
 
 
-def check_http_response_success(result):
-    """Convert a 'requests' response to a loop control flag.
+def check_http_response_success(status_code):
+    """Convert an HTTP status code to a loop control flag.
 
-    Pass this method a requests.Response object.  It returns True if
-    the response indicates success, None if it indicates temporary
+    Pass this method a numeric HTTP status code.  It returns True if
+    the code indicates success, None if it indicates temporary
     failure, and False otherwise.  You can use this as the
     success_check for a RetryLoop.
 
@@ -128,15 +129,11 @@ def check_http_response_success(result):
       3xx status codes.  They don't indicate success, and you can't
       retry those requests verbatim.
     """
-    try:
-        status = result.status_code
-    except Exception:
-        return None
-    if status in _HTTP_SUCCESSES:
+    if status_code in _HTTP_SUCCESSES:
         return True
-    elif status in _HTTP_CAN_RETRY:
+    elif status_code in _HTTP_CAN_RETRY:
         return None
-    elif 100 <= status < 600:
+    elif 100 <= status_code < 600:
         return False
     else:
         return None  # Get well soon, server.
index ca28025fea64ae6b388af4388337814494a8c0c8..f27c28df11b475dc941bfb76aec4e37b57840def 100644 (file)
@@ -26,23 +26,24 @@ setup(name='arvados-python-client',
       license='Apache 2.0',
       packages=find_packages(),
       scripts=[
-        'bin/arv-copy',
-        'bin/arv-get',
-        'bin/arv-keepdocker',
-        'bin/arv-ls',
-        'bin/arv-normalize',
-        'bin/arv-put',
-        'bin/arv-run',
-        'bin/arv-ws'
-        ],
+          'bin/arv-copy',
+          'bin/arv-get',
+          'bin/arv-keepdocker',
+          'bin/arv-ls',
+          'bin/arv-normalize',
+          'bin/arv-put',
+          'bin/arv-run',
+          'bin/arv-ws'
+      ],
       install_requires=[
-        'python-gflags',
-        'google-api-python-client',
-        'httplib2',
-        'requests>=2.4',
-        'urllib3',
-        'ws4py'
-        ],
+          'google-api-python-client',
+          'httplib2',
+          'pycurl',
+          'python-gflags',
+          'requests>=2.4',
+          'urllib3',
+          'ws4py'
+      ],
       test_suite='tests',
       tests_require=['mock>=1.0', 'PyYAML'],
       zip_safe=False,
index a10802ae81b7b2bf439f8452052a0ba1e9643617..b4e97f6e5a062a7b9232e6ed2dce0fc6b1ea0e30 100644 (file)
@@ -8,8 +8,8 @@ import httplib2
 import io
 import mock
 import os
+import pycurl
 import Queue
-import requests
 import shutil
 import tempfile
 import unittest
@@ -43,44 +43,80 @@ def mock_responses(body, *codes, **headers):
     return mock.patch('httplib2.Http.request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
 
-# fake_requests_response, mock_get_responses and mock_put_responses
-# mock calls to requests.get() and requests.put()
-def fake_requests_response(code, body, **headers):
-    r = requests.Response()
-    r.status_code = code
-    r.reason = httplib.responses.get(code, "Unknown Response")
-    r.headers = headers
-    r.raw = io.BytesIO(body)
-    return r
-
-# The following methods patch requests.Session(), where return_value is a mock
-# Session object.  The put/get attributes are set on mock Session, and the
-# desired put/get behavior is set on the put/get mocks.
-
-def mock_put_responses(body, *codes, **headers):
-    m = mock.MagicMock()
+
+class FakeCurl:
+    @classmethod
+    def make(cls, code, body='', headers={}):
+        return mock.Mock(spec=cls, wraps=cls(code, body, headers))
+
+    def __init__(self, code=200, body='', headers={}):
+        self._opt = {}
+        self._got_url = None
+        self._writer = None
+        self._headerfunction = None
+        self._resp_code = code
+        self._resp_body = body
+        self._resp_headers = headers
+
+    def getopt(self, opt):
+        return self._opt.get(str(opt), None)
+
+    def setopt(self, opt, val):
+        self._opt[str(opt)] = val
+        if opt == pycurl.WRITEDATA:
+            self._writer = val
+        elif opt == pycurl.HEADERFUNCTION:
+            self._headerfunction = val
+
+    def perform(self):
+        if not isinstance(self._resp_code, int):
+            raise self._resp_code
+        if self.getopt(pycurl.URL) is None:
+            raise ValueError
+        if self._writer is None:
+            raise ValueError
+        if self._headerfunction:
+            self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
+            for k, v in self._resp_headers.iteritems():
+                self._headerfunction(k + ': ' + str(v))
+        self._writer.write(self._resp_body)
+
+    def close(self):
+        pass
+
+    def reset(self):
+        """Prevent fake UAs from going back into the user agent pool."""
+        raise Exception
+
+    def getinfo(self, opt):
+        if opt == pycurl.RESPONSE_CODE:
+            return self._resp_code
+        raise Exception
+
+def mock_keep_responses(body, *codes, **headers):
+    """Patch pycurl to return fake responses and raise exceptions.
+
+    body can be a string to return as the response body; an exception
+    to raise when perform() is called; or an iterable that returns a
+    sequence of such values.
+    """
+    cm = mock.MagicMock()
     if isinstance(body, tuple):
         codes = list(codes)
         codes.insert(0, body)
-        m.return_value.put.side_effect = queue_with((fake_requests_response(code, b, **headers) for b, code in codes))
+        responses = [
+            FakeCurl.make(code=code, body=b, headers=headers)
+            for b, code in codes
+        ]
     else:
-        m.return_value.put.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
-    return mock.patch('requests.Session', m)
-
-def mock_get_responses(body, *codes, **headers):
-    m = mock.MagicMock()
-    m.return_value.get.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
-    return mock.patch('requests.Session', m)
-
-def mock_get(side_effect):
-    m = mock.MagicMock()
-    m.return_value.get.side_effect = side_effect
-    return mock.patch('requests.Session', m)
-
-def mock_put(side_effect):
-    m = mock.MagicMock()
-    m.return_value.put.side_effect = side_effect
-    return mock.patch('requests.Session', m)
+        responses = [
+            FakeCurl.make(code=code, body=body, headers=headers)
+            for code in codes
+        ]
+    cm.side_effect = queue_with(responses)
+    cm.responses = responses
+    return mock.patch('pycurl.Curl', cm)
+
 
 class MockStreamReader(object):
     def __init__(self, name='.', *data):
index d3198be473f6819006bcbb4c1c6ddda1a53c9361..a397f44fa36c4abf005e7b7b30f520a5cffaa5f9 100644 (file)
@@ -551,7 +551,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
     def test_locator_init(self):
         client = self.api_client_mock(200)
         # Ensure Keep will not return anything if asked.
-        with tutil.mock_get_responses(None, 404):
+        with tutil.mock_keep_responses(None, 404):
             reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
                                               api_client=client)
             self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
@@ -561,7 +561,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
         # been written to Keep.
         client = self.api_client_mock(200)
         self.mock_get_collection(client, 404, None)
-        with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+        with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
             reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
                                               api_client=client)
             self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
@@ -569,7 +569,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
     def test_uuid_init_no_fallback_to_keep(self):
         # Do not look up a collection UUID in Keep.
         client = self.api_client_mock(404)
-        with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+        with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
             with self.assertRaises(arvados.errors.ApiError):
                 reader = arvados.CollectionReader(self.DEFAULT_UUID,
                                                   api_client=client)
@@ -578,7 +578,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
         # To verify that CollectionReader tries Keep first here, we
         # mock API server to return the wrong data.
         client = self.api_client_mock(200)
-        with tutil.mock_get_responses(self.ALT_MANIFEST, 200):
+        with tutil.mock_keep_responses(self.ALT_MANIFEST, 200):
             self.assertEqual(
                 self.ALT_MANIFEST,
                 arvados.CollectionReader(
@@ -590,7 +590,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
         client = self.api_client_mock(200)
         reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
                                           num_retries=3)
-        with tutil.mock_get_responses('foo', 500, 500, 200):
+        with tutil.mock_keep_responses('foo', 500, 500, 200):
             self.assertEqual('foo',
                              ''.join(f.read(9) for f in reader.all_files()))
 
@@ -630,7 +630,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
     def test_api_response_with_collection_from_keep(self):
         client = self.api_client_mock()
         self.mock_get_collection(client, 404, 'foo')
-        with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+        with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
             reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
                                               api_client=client)
             api_response = reader.api_response()
@@ -673,7 +673,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
 class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
     def mock_keep(self, body, *codes, **headers):
         headers.setdefault('x-keep-replicas-stored', 2)
-        return tutil.mock_put_responses(body, *codes, **headers)
+        return tutil.mock_keep_responses(body, *codes, **headers)
 
     def foo_writer(self, **kwargs):
         kwargs.setdefault('api_client', self.api_client_mock())
@@ -695,7 +695,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
 
     def test_write_insufficient_replicas_via_proxy(self):
         writer = self.foo_writer(replication=3)
-        with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+        with self.mock_keep(None, 200, **{'x-keep-replicas-stored': 2}):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 writer.manifest_text()
 
@@ -717,10 +717,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             self.mock_keep_services(client, status=200, service_type='disk', count=6)
             writer = self.foo_writer(api_client=client, replication=3)
             writer.manifest_text()
-            # keepmock is the mock session constructor; keepmock.return_value
-            # is the mock session object, and keepmock.return_value.put is the
-            # actual mock method of interest.
-            self.assertEqual(6, keepmock.return_value.put.call_count)
+            self.assertEqual(6, keepmock.call_count)
 
     def test_write_whole_collection_through_retries(self):
         writer = self.foo_writer(num_retries=2)
index 6e8df96316aa36510828515d08cb7c3562dcfe99..db875dc212a1567638ad82155817078e0da48773 100644 (file)
@@ -10,8 +10,8 @@ class KeepRequestErrorTestCase(unittest.TestCase):
     REQUEST_ERRORS = [
         ('http://keep1.zzzzz.example.org/', IOError("test IOError")),
         ('http://keep3.zzzzz.example.org/', MemoryError("test MemoryError")),
-        ('http://keep5.zzzzz.example.org/', tutil.fake_requests_response(
-                500, "test 500")),
+        ('http://keep5.zzzzz.example.org/',
+         arv_error.HttpError(500, "Internal Server Error")),
         ('http://keep7.zzzzz.example.org/', IOError("second test IOError")),
         ]
 
index be13c55048a97d640f95d91e4e787824814b7f62..0c42c2f18cdf1221bf14c18483f9393e4309d66b 100644 (file)
@@ -1,6 +1,7 @@
 import hashlib
 import mock
 import os
+import pycurl
 import random
 import re
 import socket
@@ -273,57 +274,59 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_get_timeout(self):
         api_client = self.mock_keep_services(count=1)
-        force_timeout = [socket.timeout("timed out")]
-        with tutil.mock_get(force_timeout) as mock_session:
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
             keep_client = arvados.KeepClient(api_client=api_client)
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
-            self.assertTrue(mock_session.return_value.get.called)
             self.assertEqual(
-                arvados.KeepClient.DEFAULT_TIMEOUT,
-                mock_session.return_value.get.call_args[1]['timeout'])
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
 
     def test_put_timeout(self):
         api_client = self.mock_keep_services(count=1)
-        force_timeout = [socket.timeout("timed out")]
-        with tutil.mock_put(force_timeout) as mock_session:
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
             keep_client = arvados.KeepClient(api_client=api_client)
             with self.assertRaises(arvados.errors.KeepWriteError):
                 keep_client.put('foo')
-            self.assertTrue(mock_session.return_value.put.called)
             self.assertEqual(
-                arvados.KeepClient.DEFAULT_TIMEOUT,
-                mock_session.return_value.put.call_args[1]['timeout'])
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
 
     def test_proxy_get_timeout(self):
-        # Force a timeout, verifying that the requests.get or
-        # requests.put method was called with the proxy_timeout
-        # setting rather than the default timeout.
         api_client = self.mock_keep_services(service_type='proxy', count=1)
-        force_timeout = [socket.timeout("timed out")]
-        with tutil.mock_get(force_timeout) as mock_session:
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
             keep_client = arvados.KeepClient(api_client=api_client)
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
-            self.assertTrue(mock_session.return_value.get.called)
             self.assertEqual(
-                arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
-                mock_session.return_value.get.call_args[1]['timeout'])
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
 
     def test_proxy_put_timeout(self):
-        # Force a timeout, verifying that the requests.get or
-        # requests.put method was called with the proxy_timeout
-        # setting rather than the default timeout.
         api_client = self.mock_keep_services(service_type='proxy', count=1)
-        force_timeout = [socket.timeout("timed out")]
-        with tutil.mock_put(force_timeout) as mock_session:
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
             keep_client = arvados.KeepClient(api_client=api_client)
             with self.assertRaises(arvados.errors.KeepWriteError):
                 keep_client.put('foo')
-            self.assertTrue(mock_session.return_value.put.called)
             self.assertEqual(
-                arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
-                mock_session.return_value.put.call_args[1]['timeout'])
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
 
     def test_probe_order_reference_set(self):
         # expected_order[i] is the probe order for
@@ -397,9 +400,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=16)
         keep_client = arvados.KeepClient(api_client=api_client)
-        with mock.patch('requests.' + verb,
-                        side_effect=socket.timeout) as req_mock, \
-                self.assertRaises(exc_class) as err_check:
+        with mock.patch('pycurl.Curl') as curl_mock, \
+             self.assertRaises(exc_class) as err_check:
+            curl_mock.return_value.side_effect = socket.timeout
             getattr(keep_client, verb)(data)
         urls = [urlparse.urlparse(url)
                 for url in err_check.exception.request_errors()]
@@ -429,7 +432,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def check_errors_from_last_retry(self, verb, exc_class):
         api_client = self.mock_keep_services(count=2)
-        req_mock = getattr(tutil, 'mock_{}_responses'.format(verb))(
+        req_mock = tutil.mock_keep_responses(
             "retry error reporting test", 500, 500, 403, 403)
         with req_mock, tutil.skip_sleep, \
                 self.assertRaises(exc_class) as err_check:
@@ -450,7 +453,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         data = 'partial failure test'
         data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
         api_client = self.mock_keep_services(count=3)
-        with tutil.mock_put_responses(data_loc, 200, 500, 500) as req_mock, \
+        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
             keep_client = arvados.KeepClient(api_client=api_client)
             keep_client.put(data)
@@ -468,28 +471,31 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
                 'service_type': 'gateway:test',
         } for i in range(gateways)]
         self.gateway_roots = [
-            "https://[{service_host}]:{service_port}/".format(**gw)
+            "https://{service_host}:{service_port}/".format(**gw)
             for gw in self.gateways]
         self.api_client = self.mock_keep_services(
             count=disks, additional_services=self.gateways)
         self.keepClient = arvados.KeepClient(api_client=self.api_client)
 
-    @mock.patch('requests.Session')
-    def test_get_with_gateway_hint_first(self, MockSession):
-        MockSession.return_value.get.return_value = tutil.fake_requests_response(
+    @mock.patch('pycurl.Curl')
+    def test_get_with_gateway_hint_first(self, MockCurl):
+        MockCurl.return_value = tutil.FakeCurl.make(
             code=200, body='foo', headers={'Content-Length': 3})
         self.mock_disks_and_gateways()
         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
         self.assertEqual('foo', self.keepClient.get(locator))
-        self.assertEqual((self.gateway_roots[0]+locator,),
-                         MockSession.return_value.get.call_args_list[0][0])
+        self.assertEqual(self.gateway_roots[0]+locator,
+                         MockCurl.return_value.getopt(pycurl.URL))
 
-    @mock.patch('requests.Session')
-    def test_get_with_gateway_hints_in_order(self, MockSession):
+    @mock.patch('pycurl.Curl')
+    def test_get_with_gateway_hints_in_order(self, MockCurl):
         gateways = 4
         disks = 3
-        MockSession.return_value.get.return_value = tutil.fake_requests_response(
-            code=404, body='')
+        mocks = [
+            tutil.FakeCurl.make(code=404, body='')
+            for _ in range(gateways+disks)
+        ]
+        MockCurl.side_effect = tutil.queue_with(mocks)
         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
                            ['K@'+gw['uuid'] for gw in self.gateways])
@@ -497,23 +503,23 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             self.keepClient.get(locator)
         # Gateways are tried first, in the order given.
         for i, root in enumerate(self.gateway_roots):
-            self.assertEqual((root+locator,),
-                             MockSession.return_value.get.call_args_list[i][0])
+            self.assertEqual(root+locator,
+                             mocks[i].getopt(pycurl.URL))
         # Disk services are tried next.
         for i in range(gateways, gateways+disks):
             self.assertRegexpMatches(
-                MockSession.return_value.get.call_args_list[i][0][0],
+                mocks[i].getopt(pycurl.URL),
                 r'keep0x')
 
-    @mock.patch('requests.Session')
-    def test_get_with_remote_proxy_hint(self, MockSession):
-        MockSession.return_value.get.return_value = tutil.fake_requests_response(
+    @mock.patch('pycurl.Curl')
+    def test_get_with_remote_proxy_hint(self, MockCurl):
+        MockCurl.return_value = tutil.FakeCurl.make(
             code=200, body='foo', headers={'Content-Length': 3})
         self.mock_disks_and_gateways()
         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
         self.assertEqual('foo', self.keepClient.get(locator))
-        self.assertEqual(('https://keep.xyzzy.arvadosapi.com/'+locator,),
-                         MockSession.return_value.get.call_args_list[0][0])
+        self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
+                         MockCurl.return_value.getopt(pycurl.URL))
 
 
 class KeepClientRetryTestMixin(object):
@@ -587,14 +593,14 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
-    TEST_PATCHER = staticmethod(tutil.mock_get_responses)
+    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
 
     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
                    *args, **kwargs):
         return self.new_client().get(locator, *args, **kwargs)
 
     def test_specific_exception_when_not_found(self):
-        with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
 
     def test_general_exception_with_mixed_errors(self):
@@ -603,7 +609,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
         # This test rigs up 50/50 disagreement between two servers, and
         # checks that it does not become a NotFoundError.
         client = self.new_client()
-        with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 500):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
                 client.get(self.HINTED_LOCATOR)
             self.assertNotIsInstance(
@@ -611,17 +617,19 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
                 "mixed errors raised NotFoundError")
 
     def test_hint_server_can_succeed_without_retries(self):
-        with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
             self.check_success(locator=self.HINTED_LOCATOR)
 
     def test_try_next_server_after_timeout(self):
-        with tutil.mock_get([
-                socket.timeout("timed out"),
-                tutil.fake_requests_response(200, self.DEFAULT_EXPECT)]):
+        with tutil.mock_keep_responses(
+                (socket.timeout("timed out"), 200),
+                (self.DEFAULT_EXPECT, 200)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
     def test_retry_data_with_wrong_checksum(self):
-        with tutil.mock_get((tutil.fake_requests_response(200, s) for s in ['baddata', self.TEST_DATA])):
+        with tutil.mock_keep_responses(
+                ('baddata', 200),
+                (self.DEFAULT_EXPECT, 200)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 
@@ -629,12 +637,12 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
-    TEST_PATCHER = staticmethod(tutil.mock_put_responses)
+    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
 
     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
                    copies=1, *args, **kwargs):
         return self.new_client().put(data, copies, *args, **kwargs)
 
     def test_do_not_send_multiple_copies_to_same_server(self):
-        with tutil.mock_put_responses(self.DEFAULT_EXPECT, 200):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
             self.check_exception(copies=2, num_retries=3)
index 0c1110c5ceb10e5164d48db9c03bd2e74f8b9639..4f147ba54c01ab3975addb8f66386dee9c61656d 100644 (file)
@@ -7,8 +7,6 @@ import arvados.errors as arv_error
 import arvados.retry as arv_retry
 import mock
 
-from arvados_testutil import fake_requests_response
-
 class RetryLoopTestMixin(object):
     @staticmethod
     def loop_success(result):
@@ -150,8 +148,7 @@ class RetryLoopBackoffTestCase(unittest.TestCase, RetryLoopTestMixin):
 class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
     def results_map(self, *codes):
         for code in codes:
-            response = fake_requests_response(code, None)
-            yield code, arv_retry.check_http_response_success(response)
+            yield code, arv_retry.check_http_response_success(code)
 
     def check(assert_name):
         def check_method(self, expected, *codes):
index acb9929cae3ce90f9207d4e0754bd66cc57f5f2c..e90f6025a3b203ff7c1f3fe08a86186e57ef5dd7 100644 (file)
@@ -199,47 +199,47 @@ class StreamRetryTestMixin(object):
 
     @tutil.skip_sleep
     def test_success_without_retries(self):
-        with tutil.mock_get_responses('bar', 200):
+        with tutil.mock_keep_responses('bar', 200):
             reader = self.reader_for('bar_file')
             self.assertEqual('bar', self.read_for_test(reader, 3))
 
     @tutil.skip_sleep
     def test_read_no_default_retry(self):
-        with tutil.mock_get_responses('', 500):
+        with tutil.mock_keep_responses('', 500):
             reader = self.reader_for('user_agreement')
             with self.assertRaises(arvados.errors.KeepReadError):
                 self.read_for_test(reader, 10)
 
     @tutil.skip_sleep
     def test_read_with_instance_retries(self):
-        with tutil.mock_get_responses('foo', 500, 200):
+        with tutil.mock_keep_responses('foo', 500, 200):
             reader = self.reader_for('foo_file', num_retries=3)
             self.assertEqual('foo', self.read_for_test(reader, 3))
 
     @tutil.skip_sleep
     def test_read_with_method_retries(self):
-        with tutil.mock_get_responses('foo', 500, 200):
+        with tutil.mock_keep_responses('foo', 500, 200):
             reader = self.reader_for('foo_file')
             self.assertEqual('foo',
                              self.read_for_test(reader, 3, num_retries=3))
 
     @tutil.skip_sleep
     def test_read_instance_retries_exhausted(self):
-        with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+        with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
             reader = self.reader_for('bar_file', num_retries=3)
             with self.assertRaises(arvados.errors.KeepReadError):
                 self.read_for_test(reader, 3)
 
     @tutil.skip_sleep
     def test_read_method_retries_exhausted(self):
-        with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+        with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
             reader = self.reader_for('bar_file')
             with self.assertRaises(arvados.errors.KeepReadError):
                 self.read_for_test(reader, 3, num_retries=3)
 
     @tutil.skip_sleep
     def test_method_retries_take_precedence(self):
-        with tutil.mock_get_responses('', 500, 500, 500, 200):
+        with tutil.mock_keep_responses('', 500, 500, 500, 200):
             reader = self.reader_for('user_agreement', num_retries=10)
             with self.assertRaises(arvados.errors.KeepReadError):
                 self.read_for_test(reader, 10, num_retries=1)