5414: Add client support for Keep service hints.
[arvados.git] / sdk / python / arvados / keep.py
index 6196b502021a6036aa96c33d61c9bc6fb5d4f4f1..842a36d8ed6145062f166347f4b79126eadce196 100644 (file)
@@ -62,7 +62,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -518,6 +518,7 @@ class KeepClient(object):
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
+                self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
@@ -531,6 +532,7 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = {}
                 self._keep_services = None
                 self.using_proxy = None
                 self._static_services_list = False
@@ -560,21 +562,31 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            accessible = keep_services.execute().get('items')
+            if not accessible:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
-            for r in self._keep_services:
+            for r in accessible:
                 r['_service_root'] = "{}://[{}]:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     r['service_host'],
                     r['service_port'])
+
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+            _logger.debug(str(self._gateway_services))
+
+            self._keep_services = [
+                ks for ks in accessible
+                if ks.get('service_type') in ['disk', 'proxy']]
             _logger.debug(str(self._keep_services))
 
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in self._keep_services)
+
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
@@ -584,31 +596,46 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, data_hash, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
-        the given hash.
+        the given hash+hints.
         """
         self.build_services_list(force_rebuild)
 
-        # Sort the available services by weight (heaviest first) for
-        # this data_hash, and return their service_roots (base URIs)
+        sorted_roots = []
+
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
         # in that order.
-        sorted_roots = [
+        sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
-                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
-        _logger.debug(data_hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, self.session, **headers)
@@ -664,28 +691,40 @@ class KeepClient(object):
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
-        slot, first = self.block_cache.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
             v = slot.get()
             return v
 
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+        sorted_roots = []
+        roots_map = {}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, expect_hash,
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
                     force_rebuild=(tries_left < num_retries))
             except Exception as error:
                 loop.save_result(error)
@@ -694,7 +733,7 @@ class KeepClient(object):
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
-                               for root in (local_roots + hint_roots)
+                               for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
@@ -708,22 +747,17 @@ class KeepClient(object):
         if loop.success():
             return blob
 
-        try:
-            all_roots = local_roots + hint_roots
-        except NameError:
-            # We never successfully fetched local_roots.
-            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for key in all_roots
+        not_founds = sum(1 for key in sorted_roots
                          if roots_map[key].last_status() in {403, 404, 410})
         service_errors = ((key, roots_map[key].last_result)
-                          for key in all_roots)
+                          for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
-        elif not_founds == len(all_roots):
+        elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
@@ -758,6 +792,7 @@ class KeepClient(object):
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
+        locator = KeepLocator(data_hash + '+' + str(len(data)))
 
         headers = {}
         if self.using_proxy:
@@ -770,7 +805,7 @@ class KeepClient(object):
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
-                    roots_map, data_hash,
+                    roots_map, locator,
                     force_rebuild=(tries_left < num_retries), **headers)
             except Exception as error:
                 loop.save_result(error)