3857: implement proxy_timeout
[arvados.git] / sdk / python / arvados / keep.py
index 7621a9608e94ec99400ab0df87e812ab223a157a..d92758062dce05d30a7dc8775157fab3761469b7 100644 (file)
@@ -1,6 +1,4 @@
 import gflags
-import httplib
-import httplib2
 import logging
 import os
 import pprint
@@ -21,6 +19,7 @@ import timer
 import datetime
 import ssl
 import socket
+import requests
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -165,7 +164,7 @@ class KeepBlockCache(object):
             self.ready.set()
 
         def size(self):
-            if self.content == None:
+            if self.content is None:
                 return 0
             else:
                 return len(self.content)
@@ -210,6 +209,14 @@ class KeepBlockCache(object):
             self._cache_lock.release()
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  3 seconds
+    # Default Keep server read timeout:       30 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        60 seconds
+    DEFAULT_TIMEOUT = (3, 30)
+    DEFAULT_PROXY_TIMEOUT = (20, 60)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -269,10 +276,10 @@ class KeepClient(object):
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+        HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, timeout=None, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
@@ -288,19 +295,19 @@ class KeepClient(object):
 
         def last_status(self):
             try:
-                return int(self.last_result[0].status)
+                return self.last_result.status_code
             except (AttributeError, IndexError, ValueError):
                 return None
 
-        def get(self, http, locator):
-            # http is an httplib2.Http object.
+        def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = http.request(url.encode('utf-8'), 'GET',
-                                          headers=self.get_headers)
+                    result = requests.get(url.encode('utf-8'),
+                                          headers=self.get_headers,
+                                          timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
@@ -308,7 +315,7 @@ class KeepClient(object):
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
-                content = result[1]
+                content = result.content
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              (len(content)/(1024.0*1024))/t.secs)
@@ -316,15 +323,18 @@ class KeepClient(object):
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
                         return content
-                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+                    _logger.warning("Checksum fail: md5(%s) = %s",
+                                    url, resp_md5)
             return None
 
-        def put(self, http, hash_s, body):
+        def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = http.request(url.encode('utf-8'), 'PUT',
-                                      headers=self.put_headers, body=body)
+                result = requests.put(url.encode('utf-8'),
+                                      data=body,
+                                      headers=self.put_headers,
+                                      timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
@@ -365,12 +375,13 @@ class KeepClient(object):
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
             self._success = bool(self.service.put(
-                    h, self.args['data_hash'], self.args['data']))
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
             status = self.service.last_status()
             if self._success:
-                resp, body = self.service.last_result
+                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -380,18 +391,20 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(body.strip(), replicas_stored)
+                limiter.save_response(result.text.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result[1])
+                              self.service.last_result.text)
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=300,
-                 api_token=None, local_store=None, block_cache=None):
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
+                 api_token=None, local_store=None, block_cache=None,
+                 num_retries=0):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -402,8 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          300.
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (3, 30).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 60).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -415,20 +434,25 @@ class KeepClient(object):
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
+        * num_retries: The default number of times to retry failed requests.
+          This will be used as the default num_retries value when get() and
+          put() are called.  Default 0.
+
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
-            api_token = config.get('ARVADOS_API_TOKEN')
+            if api_client is None:
+                api_token = config.get('ARVADOS_API_TOKEN')
+            else:
+                api_token = api_client.api_token
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
-        if block_cache is None:
-            raise Exception()
         self.block_cache = block_cache if block_cache else KeepBlockCache()
 
         if local_store:
@@ -436,14 +460,14 @@ class KeepClient(object):
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
-
+            self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.timeout = proxy_timeout
                 self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
@@ -454,6 +478,7 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.timeout = timeout
                 self.static_service_roots = False
 
     def build_service_roots(self, force_rebuild=False):
@@ -470,14 +495,14 @@ class KeepClient(object):
             if not keep_services:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = (keep_services[0].get('service_type') ==
-                                'proxy')
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in keep_services)
 
-            roots = (("http%s://%s:%d/" %
-                      ('s' if f['service_ssl_flag'] else '',
-                       f['service_host'],
-                       f['service_port']))
-                     for f in keep_services)
+            roots = ("{}://[{}]:{:d}/".format(
+                        'https' if ks['service_ssl_flag'] else 'http',
+                         ks['service_host'],
+                         ks['service_port'])
+                     for ks in keep_services)
             self.service_roots = sorted(set(roots))
             _logger.debug(str(self.service_roots))
 
@@ -559,7 +584,8 @@ class KeepClient(object):
         else:
             return None
 
-    def get(self, loc_s, num_retries=0):
+    @retry.retry_method
+    def get(self, loc_s, num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -576,7 +602,8 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  Note that, in each loop, the method may try
           to fetch data from every available Keep service, along with any
-          that are named in location hints in the locator.  Default 0.
+          that are named in location hints in the locator.  The default value
+          is set when the KeepClient is initialized.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
@@ -614,9 +641,8 @@ class KeepClient(object):
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
-                blob = keep_service.get(http, locator)
+                blob = keep_service.get(locator, timeout=self.timeout)
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
@@ -639,7 +665,8 @@ class KeepClient(object):
         else:
             raise arvados.errors.KeepReadError(loc_s)
 
-    def put(self, data, copies=2, num_retries=0):
+    @retry.retry_method
+    def put(self, data, copies=2, num_retries=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -654,7 +681,8 @@ class KeepClient(object):
           Default 2.
         * num_retries: The number of times to retry PUT requests to
           *each* Keep server if it returns temporary failures, with
-          exponential backoff.  Default 0.
+          exponential backoff.  The default value is set when the
+          KeepClient is initialized.
         """
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
@@ -700,7 +728,10 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, copies, thread_limiter.done()))
 
-    def local_store_put(self, data):
+    # Local storage methods need no-op num_retries arguments to keep
+    # integration tests happy.  With better isolation they could
+    # probably be removed again.
+    def local_store_put(self, data, num_retries=0):
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -708,7 +739,8 @@ class KeepClient(object):
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
         return locator
-    def local_store_get(self, loc_s):
+
+    def local_store_get(self, loc_s, num_retries=0):
         try:
             locator = KeepLocator(loc_s)
         except ValueError: