Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
[arvados.git] / sdk / python / arvados / keep.py
index 0c7f4674f9fd62eacf6662897502c496ff4d2b76..cca9345bc5e9760d4b8f2d7a0b1d65aa69ffb5e1 100644 (file)
@@ -62,7 +62,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -449,16 +449,21 @@ class KeepClient(object):
           KeepClient does not use a proxy, pass in an empty string.
 
         :timeout:
           KeepClient does not use a proxy, pass in an empty string.
 
         :timeout:
-          The timeout (in seconds) for HTTP requests to Keep
+          The initial timeout (in seconds) for HTTP requests to Keep
           non-proxy servers.  A tuple of two floats is interpreted as
           (connection_timeout, read_timeout): see
           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
           non-proxy servers.  A tuple of two floats is interpreted as
           (connection_timeout, read_timeout): see
           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Because timeouts are often a result of transient server load, the
+          actual connection timeout will be increased by a factor of two on
+          each retry.
           Default: (2, 300).
 
         :proxy_timeout:
           Default: (2, 300).
 
         :proxy_timeout:
-          The timeout (in seconds) for HTTP requests to
+          The initial timeout (in seconds) for HTTP requests to
           Keep proxies. A tuple of two floats is interpreted as
           Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). Default: (20, 300).
+          (connection_timeout, read_timeout). The behavior described
+          above for adjusting connection timeouts on retry also applies.
+          Default: (20, 300).
 
         :api_token:
           If you're not using an API client, but only talking
 
         :api_token:
           If you're not using an API client, but only talking
@@ -513,6 +518,7 @@ class KeepClient(object):
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
+                self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
@@ -526,18 +532,25 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = {}
                 self._keep_services = None
                 self.using_proxy = None
                 self._static_services_list = False
 
                 self._keep_services = None
                 self.using_proxy = None
                 self._static_services_list = False
 
-    def current_timeout(self):
-        """Return the appropriate timeout to use for this client: the proxy
-        timeout setting if the backend service is currently a proxy,
-        the regular timeout setting otherwise.
+    def current_timeout(self, attempt_number):
+        """Return the appropriate timeout to use for this client.
+
+        The proxy timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.  The `attempt_number` indicates
+        how many times the operation has been tried already (starting from 0
+        for the first try), and scales the connection timeout portion of the
+        return value accordingly.
+
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
-        return self.proxy_timeout if self.using_proxy else self.timeout
+        t = self.proxy_timeout if self.using_proxy else self.timeout
+        return (t[0] * (1 << attempt_number), t[1])
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -549,21 +562,31 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            accessible = keep_services.execute().get('items')
+            if not accessible:
                 raise arvados.errors.NoKeepServersError()
 
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
             # Precompute the base URI for each service.
-            for r in self._keep_services:
+            for r in accessible:
                 r['_service_root'] = "{}://[{}]:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     r['service_host'],
                     r['service_port'])
                 r['_service_root'] = "{}://[{}]:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     r['service_host'],
                     r['service_port'])
+
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+            _logger.debug(str(self._gateway_services))
+
+            self._keep_services = [
+                ks for ks in accessible
+                if ks.get('service_type') in ['disk', 'proxy']]
             _logger.debug(str(self._keep_services))
 
             _logger.debug(str(self._keep_services))
 
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in self._keep_services)
+
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
@@ -573,31 +596,46 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, data_hash, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
-        the given hash.
+        the given hash+hints.
         """
         self.build_services_list(force_rebuild)
 
         """
         self.build_services_list(force_rebuild)
 
-        # Sort the available services by weight (heaviest first) for
-        # this data_hash, and return their service_roots (base URIs)
+        sorted_roots = []
+
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
         # in that order.
         # in that order.
-        sorted_roots = [
+        sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
-                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
-        _logger.debug(data_hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         return sorted_roots
 
-    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, self.session, **headers)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, self.session, **headers)
@@ -624,7 +662,7 @@ class KeepClient(object):
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
-        if slot.ready.is_set():
+        if slot and slot.ready.is_set():
             return slot.get()
         else:
             return None
             return slot.get()
         else:
             return None
@@ -653,28 +691,40 @@ class KeepClient(object):
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
-        slot, first = self.block_cache.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
             v = slot.get()
             return v
 
         if not first:
             v = slot.get()
             return v
 
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+        sorted_roots = []
+        roots_map = {}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, expect_hash,
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
                     force_rebuild=(tries_left < num_retries))
             except Exception as error:
                 loop.save_result(error)
                     force_rebuild=(tries_left < num_retries))
             except Exception as error:
                 loop.save_result(error)
@@ -683,10 +733,10 @@ class KeepClient(object):
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
-                               for root in (local_roots + hint_roots)
+                               for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout())
+                blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
@@ -697,22 +747,17 @@ class KeepClient(object):
         if loop.success():
             return blob
 
         if loop.success():
             return blob
 
-        try:
-            all_roots = local_roots + hint_roots
-        except NameError:
-            # We never successfully fetched local_roots.
-            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for key in all_roots
+        not_founds = sum(1 for key in sorted_roots
                          if roots_map[key].last_status() in {403, 404, 410})
         service_errors = ((key, roots_map[key].last_result)
                          if roots_map[key].last_status() in {403, 404, 410})
         service_errors = ((key, roots_map[key].last_result)
-                          for key in all_roots)
+                          for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
-        elif not_founds == len(all_roots):
+        elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
@@ -747,6 +792,7 @@ class KeepClient(object):
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
+        locator = KeepLocator(data_hash + '+' + str(len(data)))
 
         headers = {}
         if self.using_proxy:
 
         headers = {}
         if self.using_proxy:
@@ -759,7 +805,7 @@ class KeepClient(object):
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
-                    roots_map, data_hash,
+                    roots_map, locator,
                     force_rebuild=(tries_left < num_retries), **headers)
             except Exception as error:
                 loop.save_result(error)
                     force_rebuild=(tries_left < num_retries), **headers)
             except Exception as error:
                 loop.save_result(error)
@@ -775,7 +821,7 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout())
+                    timeout=self.current_timeout(num_retries-tries_left))
                 t.start()
                 threads.append(t)
             for t in threads:
                 t.start()
                 threads.append(t)
             for t in threads: