3857: implement proxy_timeout
[arvados.git] / sdk / python / arvados / keep.py
index e75d64e57f82c816f2fdf12882583740a74d7d8e..d92758062dce05d30a7dc8775157fab3761469b7 100644 (file)
@@ -1,6 +1,4 @@
 import gflags
 import gflags
-import httplib
-import httplib2
 import logging
 import os
 import pprint
 import logging
 import os
 import pprint
@@ -20,6 +18,8 @@ import threading
 import timer
 import datetime
 import ssl
 import timer
 import datetime
 import ssl
+import socket
+import requests
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -142,8 +142,81 @@ class Keep(object):
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
+class KeepBlockCache(object):
+    # Default RAM cache is 256MiB
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
+        self.cache_max = cache_max
+        self._cache = []
+        self._cache_lock = threading.Lock()
+
+    class CacheSlot(object):
+        def __init__(self, locator):
+            self.locator = locator
+            self.ready = threading.Event()
+            self.content = None
+
+        def get(self):
+            self.ready.wait()
+            return self.content
+
+        def set(self, value):
+            self.content = value
+            self.ready.set()
+
+        def size(self):
+            if self.content is None:
+                return 0
+            else:
+                return len(self.content)
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        self._cache_lock.acquire()
+        try:
+            # Select all slots except those where ready.is_set() and content is
+            # None (that means there was an error reading the block).
+            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+            sm = sum([slot.size() for slot in self._cache])
+            while len(self._cache) > 0 and sm > self.cache_max:
+                for i in xrange(len(self._cache)-1, -1, -1):
+                    if self._cache[i].ready.is_set():
+                        del self._cache[i]
+                        break
+                sm = sum([slot.size() for slot in self._cache])
+        finally:
+            self._cache_lock.release()
+
+    def reserve_cache(self, locator):
+        '''Reserve a cache slot for the specified locator,
+        or return the existing slot.'''
+        self._cache_lock.acquire()
+        try:
+            # Test if the locator is already in the cache
+            for i in xrange(0, len(self._cache)):
+                if self._cache[i].locator == locator:
+                    n = self._cache[i]
+                    if i != 0:
+                        # move it to the front
+                        del self._cache[i]
+                        self._cache.insert(0, n)
+                    return n, False
+
+            # Add a new cache slot for the locator
+            n = KeepBlockCache.CacheSlot(locator)
+            self._cache.insert(0, n)
+            return n, True
+        finally:
+            self._cache_lock.release()
 
 class KeepClient(object):
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  3 seconds
+    # Default Keep server read timeout:       30 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        60 seconds
+    DEFAULT_TIMEOUT = (3, 30)
+    DEFAULT_PROXY_TIMEOUT = (20, 60)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -203,10 +276,10 @@ class KeepClient(object):
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
-                       ssl.SSLError)
+        HTTP_ERRORS = (requests.exceptions.RequestException,
+                       socket.error, ssl.SSLError)
 
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, timeout=None, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
             self.root = root
             self.last_result = None
             self.success_flag = None
@@ -222,19 +295,19 @@ class KeepClient(object):
 
         def last_status(self):
             try:
 
         def last_status(self):
             try:
-                return int(self.last_result[0].status)
+                return self.last_result.status_code
             except (AttributeError, IndexError, ValueError):
                 return None
 
             except (AttributeError, IndexError, ValueError):
                 return None
 
-        def get(self, http, locator):
-            # http is an httplib2.Http object.
+        def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = http.request(url.encode('utf-8'), 'GET',
-                                          headers=self.get_headers)
+                    result = requests.get(url.encode('utf-8'),
+                                          headers=self.get_headers,
+                                          timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
@@ -242,23 +315,26 @@ class KeepClient(object):
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
-                content = result[1]
-                _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+                content = result.content
+                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024*1024))/t.secs)
+                             (len(content)/(1024.0*1024))/t.secs)
                 if self.success_flag:
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
                         return content
                 if self.success_flag:
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
                         return content
-                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+                    _logger.warning("Checksum fail: md5(%s) = %s",
+                                    url, resp_md5)
             return None
 
             return None
 
-        def put(self, http, hash_s, body):
+        def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = http.request(url.encode('utf-8'), 'PUT',
-                                      headers=self.put_headers, body=body)
+                result = requests.put(url.encode('utf-8'),
+                                      data=body,
+                                      headers=self.put_headers,
+                                      timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
@@ -299,12 +375,13 @@ class KeepClient(object):
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
             self._success = bool(self.service.put(
             self._success = bool(self.service.put(
-                    h, self.args['data_hash'], self.args['data']))
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
             status = self.service.last_status()
             if self._success:
             status = self.service.last_status()
             if self._success:
-                resp, body = self.service.last_result
+                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -314,18 +391,20 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(body.strip(), replicas_stored)
+                limiter.save_response(result.text.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result[1])
+                              self.service.last_result.text)
 
 
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=60,
-                 api_token=None, local_store=None):
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
+                 api_token=None, local_store=None, block_cache=None,
+                 num_retries=0):
         """Initialize a new KeepClient.
 
         Arguments:
         """Initialize a new KeepClient.
 
         Arguments:
@@ -336,8 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          60.
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (3, 30).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 60).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -349,33 +434,40 @@ class KeepClient(object):
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
+        * num_retries: The default number of times to retry failed requests.
+          This will be used as the default num_retries value when get() and
+          put() are called.  Default 0.
+
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
-            api_token = config.get('ARVADOS_API_TOKEN')
+            if api_client is None:
+                api_token = config.get('ARVADOS_API_TOKEN')
+            else:
+                api_token = api_client.api_token
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        self.block_cache = block_cache if block_cache else KeepBlockCache()
+
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
-            self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
-            self._cache = []
-            self._cache_lock = threading.Lock()
+            self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.timeout = proxy_timeout
                 self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
                 self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
@@ -386,6 +478,7 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.timeout = timeout
                 self.static_service_roots = False
 
     def build_service_roots(self, force_rebuild=False):
                 self.static_service_roots = False
 
     def build_service_roots(self, force_rebuild=False):
@@ -402,14 +495,14 @@ class KeepClient(object):
             if not keep_services:
                 raise arvados.errors.NoKeepServersError()
 
             if not keep_services:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = (keep_services[0].get('service_type') ==
-                                'proxy')
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in keep_services)
 
 
-            roots = (("http%s://%s:%d/" %
-                      ('s' if f['service_ssl_flag'] else '',
-                       f['service_host'],
-                       f['service_port']))
-                     for f in keep_services)
+            roots = ("{}://[{}]:{:d}/".format(
+                        'https' if ks['service_ssl_flag'] else 'http',
+                         ks['service_host'],
+                         ks['service_port'])
+                     for ks in keep_services)
             self.service_roots = sorted(set(roots))
             _logger.debug(str(self.service_roots))
 
             self.service_roots = sorted(set(roots))
             _logger.debug(str(self.service_roots))
 
@@ -460,59 +553,6 @@ class KeepClient(object):
         _logger.debug(str(pseq))
         return pseq
 
         _logger.debug(str(pseq))
         return pseq
 
-    class CacheSlot(object):
-        def __init__(self, locator):
-            self.locator = locator
-            self.ready = threading.Event()
-            self.content = None
-
-        def get(self):
-            self.ready.wait()
-            return self.content
-
-        def set(self, value):
-            self.content = value
-            self.ready.set()
-
-        def size(self):
-            if self.content == None:
-                return 0
-            else:
-                return len(self.content)
-
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
-            self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
-            sm = sum([slot.size() for slot in self._cache])
-            while sm > self.cache_max:
-                del self._cache[-1]
-                sm = sum([slot.size() for a in self._cache])
-        finally:
-            self._cache_lock.release()
-
-    def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator,
-        or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepClient.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -544,7 +584,8 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
-    def get(self, loc_s, num_retries=0):
+    @retry.retry_method
+    def get(self, loc_s, num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -561,14 +602,15 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  Note that, in each loop, the method may try
           to fetch data from every available Keep service, along with any
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  Note that, in each loop, the method may try
           to fetch data from every available Keep service, along with any
-          that are named in location hints in the locator.  Default 0.
+          that are named in location hints in the locator.  The default value
+          is set when the KeepClient is initialized.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
-        slot, first = self.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
             return v
         if not first:
             v = slot.get()
             return v
@@ -599,16 +641,15 @@ class KeepClient(object):
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
             for keep_service in services_to_try:
-                blob = keep_service.get(http, locator)
+                blob = keep_service.get(locator, timeout=self.timeout)
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
-        self.cap_cache()
+        self.block_cache.cap_cache()
         if loop.success():
             return blob
 
         if loop.success():
             return blob
 
@@ -624,7 +665,8 @@ class KeepClient(object):
         else:
             raise arvados.errors.KeepReadError(loc_s)
 
         else:
             raise arvados.errors.KeepReadError(loc_s)
 
-    def put(self, data, copies=2, num_retries=0):
+    @retry.retry_method
+    def put(self, data, copies=2, num_retries=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -639,7 +681,8 @@ class KeepClient(object):
           Default 2.
         * num_retries: The number of times to retry PUT requests to
           *each* Keep server if it returns temporary failures, with
           Default 2.
         * num_retries: The number of times to retry PUT requests to
           *each* Keep server if it returns temporary failures, with
-          exponential backoff.  Default 0.
+          exponential backoff.  The default value is set when the
+          KeepClient is initialized.
         """
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
         """
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
@@ -685,7 +728,10 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, copies, thread_limiter.done()))
 
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, copies, thread_limiter.done()))
 
-    def local_store_put(self, data):
+    # Local storage methods need no-op num_retries arguments to keep
+    # integration tests happy.  With better isolation they could
+    # probably be removed again.
+    def local_store_put(self, data, num_retries=0):
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -694,7 +740,7 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s):
+    def local_store_get(self, loc_s, num_retries=0):
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
         try:
             locator = KeepLocator(loc_s)
         except ValueError: