2853: Merge branch 'master' into 2853-rendezvous
[arvados.git] / sdk / python / arvados / keep.py
index d307363950464ce5918967e29f7327a89fa3ce11..23d8c20db546c8e1f22abe2661a270c8ea614fb2 100644 (file)
@@ -21,15 +21,15 @@ import ssl
 import socket
 import requests
 
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
 import arvados
 import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -209,6 +209,14 @@ class KeepBlockCache(object):
             self._cache_lock.release()
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  2 seconds
+    # Default Keep server read timeout:      300 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:       300 seconds
+    DEFAULT_TIMEOUT = (2, 300)
+    DEFAULT_PROXY_TIMEOUT = (20, 300)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -271,7 +279,7 @@ class KeepClient(object):
         HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
-        def __init__(self, root, timeout=None, **headers):
+        def __init__(self, root, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
@@ -288,7 +296,7 @@ class KeepClient(object):
         def last_status(self):
             try:
                 return self.last_result.status_code
-            except (AttributeError, IndexError, ValueError):
+            except AttributeError:
                 return None
 
         def get(self, locator, timeout=None):
@@ -393,7 +401,8 @@ class KeepClient(object):
                               self.service.last_result.text)
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=300,
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
                  num_retries=0):
         """Initialize a new KeepClient.
@@ -406,9 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          300. A tuple of two floats is interpreted as (connection_timeout,
-          read_timeout)
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (2, 300).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 300).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -439,21 +453,25 @@ class KeepClient(object):
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         self.block_cache = block_cache if block_cache else KeepBlockCache()
+        self.timeout = timeout
+        self.proxy_timeout = proxy_timeout
 
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
             self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
-                self.service_roots = [proxy]
+                self._keep_services = [{
+                    'uuid': 'proxy',
+                    '_service_root': proxy,
+                    }]
                 self.using_proxy = True
-                self.static_service_roots = True
+                self._static_services_list = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -461,13 +479,22 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
-                self.service_roots = None
+                self._keep_services = None
                 self.using_proxy = None
-                self.static_service_roots = False
+                self._static_services_list = False
+
+    def current_timeout(self):
+        """Return the appropriate timeout to use for this client: the proxy
+        timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.
+        """
+        # TODO(twp): the timeout should be a property of a
+        # KeepService, not a KeepClient. See #4488.
+        return self.proxy_timeout if self.using_proxy else self.timeout
 
-    def build_service_roots(self, force_rebuild=False):
-        if (self.static_service_roots or
-              (self.service_roots and not force_rebuild)):
+    def build_services_list(self, force_rebuild=False):
+        if (self._static_services_list or
+              (self._keep_services and not force_rebuild)):
             return
         with self.lock:
             try:
@@ -475,68 +502,47 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            keep_services = keep_services.execute().get('items')
-            if not keep_services:
+            self._keep_services = keep_services.execute().get('items')
+            if not self._keep_services:
                 raise arvados.errors.NoKeepServersError()
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in keep_services)
-
-            roots = ("{}://[{}]:{:d}/".format(
-                        'https' if ks['service_ssl_flag'] else 'http',
-                         ks['service_host'],
-                         ks['service_port'])
-                     for ks in keep_services)
-            self.service_roots = sorted(set(roots))
-            _logger.debug(str(self.service_roots))
-
-    def shuffled_service_roots(self, hash, force_rebuild=False):
-        self.build_service_roots(force_rebuild)
-
-        # Build an ordering with which to query the Keep servers based on the
-        # contents of the hash.
-        # "hash" is a hex-encoded number at least 8 digits
-        # (32 bits) long
-
-        # seed used to calculate the next keep server from 'pool'
-        # to be added to 'pseq'
-        seed = hash
-
-        # Keep servers still to be added to the ordering
-        pool = self.service_roots[:]
-
-        # output probe sequence
-        pseq = []
-
-        # iterate while there are servers left to be assigned
-        while len(pool) > 0:
-            if len(seed) < 8:
-                # ran out of digits in the seed
-                if len(pseq) < len(hash) / 4:
-                    # the number of servers added to the probe sequence is less
-                    # than the number of 4-digit slices in 'hash' so refill the
-                    # seed with the last 4 digits and then append the contents
-                    # of 'hash'.
-                    seed = hash[-4:] + hash
-                else:
-                    # refill the seed with the contents of 'hash'
-                    seed += hash
-
-            # Take the next 8 digits (32 bytes) and interpret as an integer,
-            # then modulus with the size of the remaining pool to get the next
-            # selected server.
-            probe = int(seed[0:8], 16) % len(pool)
-
-            # Append the selected server to the probe sequence and remove it
-            # from the pool.
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-
-            # Remove the digits just used from the seed
-            seed = seed[8:]
-        _logger.debug(str(pseq))
-        return pseq
+                                   for ks in self._keep_services)
+
+            # Precompute the base URI for each service.
+            for r in self._keep_services:
+                r['_service_root'] = "{}://[{}]:{:d}/".format(
+                    'https' if r['service_ssl_flag'] else 'http',
+                    r['service_host'],
+                    r['service_port'])
+            _logger.debug(str(self._keep_services))
+
+    def _service_weight(self, hash, service_uuid):
+        """Compute the weight of a Keep service endpoint for a data
+        block with a known hash.
+
+        The weight is md5(h + u) where u is the last 15 characters of
+        the service endpoint's UUID.
+        """
+        return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
 
+    def weighted_service_roots(self, hash, force_rebuild=False):
+        """Return an array of Keep service endpoints, in the order in
+        which they should be probed when reading or writing data with
+        the given hash.
+        """
+        self.build_services_list(force_rebuild)
+
+        # Sort the available services by weight (heaviest first) for
+        # this hash, and return their service_roots (base URIs) in
+        # that order.
+        sorted_roots = [
+            svc['_service_root'] for svc in sorted(
+                self._keep_services,
+                reverse=True,
+                key=lambda svc: self._service_weight(hash, svc['uuid']))]
+        _logger.debug(hash + ': ' + str(sorted_roots))
+        return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -544,7 +550,7 @@ class KeepClient(object):
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, **headers)
@@ -626,7 +632,7 @@ class KeepClient(object):
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.timeout)
+                blob = keep_service.get(locator, timeout=self.current_timeout())
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
@@ -699,7 +705,7 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.timeout)
+                    timeout=self.current_timeout())
                 t.start()
                 threads.append(t)
             for t in threads: