3857: implement proxy_timeout
[arvados.git] / sdk / python / arvados / keep.py
index 323251d3e911a7eda979f84f89e33be982a8c318..d92758062dce05d30a7dc8775157fab3761469b7 100644 (file)
@@ -1,6 +1,4 @@
 import gflags
-import httplib
-import httplib2
 import logging
 import os
 import pprint
@@ -21,6 +19,7 @@ import timer
 import datetime
 import ssl
 import socket
+import requests
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -165,7 +164,7 @@ class KeepBlockCache(object):
             self.ready.set()
 
         def size(self):
-            if self.content == None:
+            if self.content is None:
                 return 0
             else:
                 return len(self.content)
@@ -210,6 +209,14 @@ class KeepBlockCache(object):
             self._cache_lock.release()
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  3 seconds
+    # Default Keep server read timeout:       30 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        60 seconds
+    DEFAULT_TIMEOUT = (3, 30)
+    DEFAULT_PROXY_TIMEOUT = (20, 60)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -269,10 +276,10 @@ class KeepClient(object):
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+        HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, timeout=None, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
@@ -288,19 +295,19 @@ class KeepClient(object):
 
         def last_status(self):
             try:
-                return int(self.last_result[0].status)
+                return self.last_result.status_code
             except (AttributeError, IndexError, ValueError):
                 return None
 
-        def get(self, http, locator):
-            # http is an httplib2.Http object.
+        def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = http.request(url.encode('utf-8'), 'GET',
-                                          headers=self.get_headers)
+                    result = requests.get(url.encode('utf-8'),
+                                          headers=self.get_headers,
+                                          timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
@@ -308,7 +315,7 @@ class KeepClient(object):
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
-                content = result[1]
+                content = result.content
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              (len(content)/(1024.0*1024))/t.secs)
@@ -320,12 +327,14 @@ class KeepClient(object):
                                     url, resp_md5)
             return None
 
-        def put(self, http, hash_s, body):
+        def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = http.request(url.encode('utf-8'), 'PUT',
-                                      headers=self.put_headers, body=body)
+                result = requests.put(url.encode('utf-8'),
+                                      data=body,
+                                      headers=self.put_headers,
+                                      timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
@@ -366,12 +375,13 @@ class KeepClient(object):
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
             self._success = bool(self.service.put(
-                    h, self.args['data_hash'], self.args['data']))
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
             status = self.service.last_status()
             if self._success:
-                resp, body = self.service.last_result
+                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -381,17 +391,18 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(body.strip(), replicas_stored)
+                limiter.save_response(result.text.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result[1])
+                              self.service.last_result.text)
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=300,
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
                  num_retries=0):
         """Initialize a new KeepClient.
@@ -404,8 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          300.
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (3, 30).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 60).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -420,12 +437,16 @@ class KeepClient(object):
         * num_retries: The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
+
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
-            api_token = config.get('ARVADOS_API_TOKEN')
+            if api_client is None:
+                api_token = config.get('ARVADOS_API_TOKEN')
+            else:
+                api_token = api_client.api_token
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
@@ -439,7 +460,6 @@ class KeepClient(object):
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
             self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
@@ -447,6 +467,7 @@ class KeepClient(object):
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.timeout = proxy_timeout
                 self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
@@ -457,6 +478,7 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.timeout = timeout
                 self.static_service_roots = False
 
     def build_service_roots(self, force_rebuild=False):
@@ -473,14 +495,14 @@ class KeepClient(object):
             if not keep_services:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = (keep_services[0].get('service_type') ==
-                                'proxy')
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in keep_services)
 
-            roots = (("http%s://%s:%d/" %
-                      ('s' if f['service_ssl_flag'] else '',
-                       f['service_host'],
-                       f['service_port']))
-                     for f in keep_services)
+            roots = ("{}://[{}]:{:d}/".format(
+                        'https' if ks['service_ssl_flag'] else 'http',
+                         ks['service_host'],
+                         ks['service_port'])
+                     for ks in keep_services)
             self.service_roots = sorted(set(roots))
             _logger.debug(str(self.service_roots))
 
@@ -619,9 +641,8 @@ class KeepClient(object):
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
-                blob = keep_service.get(http, locator)
+                blob = keep_service.get(locator, timeout=self.timeout)
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))