3857: implement proxy_timeout
[arvados.git] / sdk / python / arvados / keep.py
index 651cec4fea65025895047140ec072b383cb096d2..d92758062dce05d30a7dc8775157fab3761469b7 100644 (file)
@@ -1,6 +1,4 @@
 import gflags
-import httplib
-import httplib2
 import logging
 import os
 import pprint
@@ -20,6 +18,8 @@ import threading
 import timer
 import datetime
 import ssl
+import socket
+import requests
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -142,8 +142,81 @@ class Keep(object):
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
+class KeepBlockCache(object):
+    # Default RAM cache is 256MiB
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
+        self.cache_max = cache_max
+        self._cache = []
+        self._cache_lock = threading.Lock()
+
+    class CacheSlot(object):
+        def __init__(self, locator):
+            self.locator = locator
+            self.ready = threading.Event()
+            self.content = None
+
+        def get(self):
+            self.ready.wait()
+            return self.content
+
+        def set(self, value):
+            self.content = value
+            self.ready.set()
+
+        def size(self):
+            if self.content is None:
+                return 0
+            else:
+                return len(self.content)
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        self._cache_lock.acquire()
+        try:
+            # Select all slots except those where ready.is_set() and content is
+            # None (that means there was an error reading the block).
+            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+            sm = sum([slot.size() for slot in self._cache])
+            while len(self._cache) > 0 and sm > self.cache_max:
+                for i in xrange(len(self._cache)-1, -1, -1):
+                    if self._cache[i].ready.is_set():
+                        del self._cache[i]
+                        break
+                sm = sum([slot.size() for slot in self._cache])
+        finally:
+            self._cache_lock.release()
+
+    def reserve_cache(self, locator):
+        '''Reserve a cache slot for the specified locator,
+        or return the existing slot.'''
+        self._cache_lock.acquire()
+        try:
+            # Test if the locator is already in the cache
+            for i in xrange(0, len(self._cache)):
+                if self._cache[i].locator == locator:
+                    n = self._cache[i]
+                    if i != 0:
+                        # move it to the front
+                        del self._cache[i]
+                        self._cache.insert(0, n)
+                    return n, False
+
+            # Add a new cache slot for the locator
+            n = KeepBlockCache.CacheSlot(locator)
+            self._cache.insert(0, n)
+            return n, True
+        finally:
+            self._cache_lock.release()
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  3 seconds
+    # Default Keep server read timeout:       30 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        60 seconds
+    DEFAULT_TIMEOUT = (3, 30)
+    DEFAULT_PROXY_TIMEOUT = (20, 60)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -203,10 +276,10 @@ class KeepClient(object):
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
-                       ssl.SSLError)
+        HTTP_ERRORS = (requests.exceptions.RequestException,
+                       socket.error, ssl.SSLError)
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, timeout=None, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
@@ -222,19 +295,19 @@ class KeepClient(object):
 
         def last_status(self):
             try:
-                return int(self.last_result[0].status)
+                return self.last_result.status_code
             except (AttributeError, IndexError, ValueError):
                 return None
 
-        def get(self, http, locator):
-            # http is an httplib2.Http object.
+        def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = http.request(url.encode('utf-8'), 'GET',
-                                          headers=self.get_headers)
+                    result = requests.get(url.encode('utf-8'),
+                                          headers=self.get_headers,
+                                          timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
@@ -242,7 +315,7 @@ class KeepClient(object):
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
-                content = result[1]
+                content = result.content
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              (len(content)/(1024.0*1024))/t.secs)
@@ -250,15 +323,18 @@ class KeepClient(object):
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
                         return content
-                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+                    _logger.warning("Checksum fail: md5(%s) = %s",
+                                    url, resp_md5)
             return None
 
-        def put(self, http, hash_s, body):
+        def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = http.request(url.encode('utf-8'), 'PUT',
-                                      headers=self.put_headers, body=body)
+                result = requests.put(url.encode('utf-8'),
+                                      data=body,
+                                      headers=self.put_headers,
+                                      timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
@@ -299,12 +375,13 @@ class KeepClient(object):
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
             self._success = bool(self.service.put(
-                    h, self.args['data_hash'], self.args['data']))
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
             status = self.service.last_status()
             if self._success:
-                resp, body = self.service.last_result
+                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -314,18 +391,20 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(body.strip(), replicas_stored)
+                limiter.save_response(result.text.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result[1])
+                              self.service.last_result.text)
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=60,
-                 api_token=None, local_store=None):
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
+                 api_token=None, local_store=None, block_cache=None,
+                 num_retries=0):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -336,8 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          60.
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (3, 30).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 60).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -349,33 +434,40 @@ class KeepClient(object):
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
+        * num_retries: The default number of times to retry failed requests.
+          This will be used as the default num_retries value when get() and
+          put() are called.  Default 0.
+
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
-            api_token = config.get('ARVADOS_API_TOKEN')
+            if api_client is None:
+                api_token = config.get('ARVADOS_API_TOKEN')
+            else:
+                api_token = api_client.api_token
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        self.block_cache = block_cache if block_cache else KeepBlockCache()
+
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
-            self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
-            self._cache = []
-            self._cache_lock = threading.Lock()
+            self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.timeout = proxy_timeout
                 self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
@@ -386,6 +478,7 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.timeout = timeout
                 self.static_service_roots = False
 
     def build_service_roots(self, force_rebuild=False):
@@ -402,14 +495,14 @@ class KeepClient(object):
             if not keep_services:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = (keep_services[0].get('service_type') ==
-                                'proxy')
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in keep_services)
 
-            roots = (("http%s://%s:%d/" %
-                      ('s' if f['service_ssl_flag'] else '',
-                       f['service_host'],
-                       f['service_port']))
-                     for f in keep_services)
+            roots = ("{}://[{}]:{:d}/".format(
+                        'https' if ks['service_ssl_flag'] else 'http',
+                         ks['service_host'],
+                         ks['service_port'])
+                     for ks in keep_services)
             self.service_roots = sorted(set(roots))
             _logger.debug(str(self.service_roots))
 
@@ -460,59 +553,6 @@ class KeepClient(object):
         _logger.debug(str(pseq))
         return pseq
 
-    class CacheSlot(object):
-        def __init__(self, locator):
-            self.locator = locator
-            self.ready = threading.Event()
-            self.content = None
-
-        def get(self):
-            self.ready.wait()
-            return self.content
-
-        def set(self, value):
-            self.content = value
-            self.ready.set()
-
-        def size(self):
-            if self.content == None:
-                return 0
-            else:
-                return len(self.content)
-
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
-            self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
-            sm = sum([slot.size() for slot in self._cache])
-            while sm > self.cache_max:
-                del self._cache[-1]
-                sm = sum([slot.size() for a in self._cache])
-        finally:
-            self._cache_lock.release()
-
-    def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator,
-        or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepClient.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -544,7 +584,8 @@ class KeepClient(object):
         else:
             return None
 
-    def get(self, loc_s, num_retries=0):
+    @retry.retry_method
+    def get(self, loc_s, num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -561,14 +602,15 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  Note that, in each loop, the method may try
           to fetch data from every available Keep service, along with any
-          that are named in location hints in the locator.  Default 0.
+          that are named in location hints in the locator.  The default value
+          is set when the KeepClient is initialized.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
-        slot, first = self.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
             return v
@@ -599,16 +641,15 @@ class KeepClient(object):
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
-                blob = keep_service.get(http, locator)
+                blob = keep_service.get(locator, timeout=self.timeout)
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
-        self.cap_cache()
+        self.block_cache.cap_cache()
         if loop.success():
             return blob
 
@@ -624,7 +665,8 @@ class KeepClient(object):
         else:
             raise arvados.errors.KeepReadError(loc_s)
 
-    def put(self, data, copies=2, num_retries=0):
+    @retry.retry_method
+    def put(self, data, copies=2, num_retries=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -639,7 +681,8 @@ class KeepClient(object):
           Default 2.
         * num_retries: The number of times to retry PUT requests to
           *each* Keep server if it returns temporary failures, with
-          exponential backoff.  Default 0.
+          exponential backoff.  The default value is set when the
+          KeepClient is initialized.
         """
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
@@ -685,7 +728,10 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, copies, thread_limiter.done()))
 
-    def local_store_put(self, data):
+    # Local storage methods need no-op num_retries arguments to keep
+    # integration tests happy.  With better isolation they could
+    # probably be removed again.
+    def local_store_put(self, data, num_retries=0):
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -694,7 +740,7 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s):
+    def local_store_get(self, loc_s, num_retries=0):
         try:
             locator = KeepLocator(loc_s)
         except ValueError: