5305: Add heuristics to choose name when collection is referenced by PDH instead...
[arvados.git] / sdk / python / arvados / keep.py
index 23d8c20db546c8e1f22abe2661a270c8ea614fb2..71dc7ce7af863f60fb9e741b7bbc1231c22b146b 100644 (file)
@@ -394,11 +394,11 @@ class KeepClient(object):
                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(result.text.strip(), replicas_stored)
+                limiter.save_response(result.content.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result.text)
+                              self.service.last_result.content)
 
 
     def __init__(self, api_client=None, proxy=None,
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -517,16 +517,16 @@ class KeepClient(object):
                     r['service_port'])
             _logger.debug(str(self._keep_services))
 
                     r['service_port'])
             _logger.debug(str(self._keep_services))
 
-    def _service_weight(self, hash, service_uuid):
+    def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
 
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
 
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
 
-    def weighted_service_roots(self, hash, force_rebuild=False):
+    def weighted_service_roots(self, data_hash, force_rebuild=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         the given hash.
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         the given hash.
@@ -534,14 +534,14 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         # Sort the available services by weight (heaviest first) for
         self.build_services_list(force_rebuild)
 
         # Sort the available services by weight (heaviest first) for
-        # this hash, and return their service_roots (base URIs) in
-        # that order.
+        # this data_hash, and return their service_roots (base URIs)
+        # in that order.
         sorted_roots = [
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
         sorted_roots = [
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
-                key=lambda svc: self._service_weight(hash, svc['uuid']))]
-        _logger.debug(hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+        _logger.debug(data_hash + ': ' + str(sorted_roots))
         return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
@@ -643,17 +643,27 @@ class KeepClient(object):
         if loop.success():
             return blob
 
         if loop.success():
             return blob
 
-        # No servers fulfilled the request.  Count how many responded
-        # "not found;" if the ratio is high enough (currently 75%), report
-        # Not Found; otherwise a generic error.
+        try:
+            all_roots = local_roots + hint_roots
+        except NameError:
+            # We never successfully fetched local_roots.
+            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for ks in roots_map.values()
-                         if ks.last_status() in set([403, 404, 410]))
-        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
-            raise arvados.errors.NotFoundError(loc_s)
+        not_founds = sum(1 for key in all_roots
+                         if roots_map[key].last_status() in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result)
+                          for key in all_roots)
+        if not roots_map:
+            raise arvados.errors.KeepReadError(
+                "failed to read {}: no Keep services available ({})".format(
+                    loc_s, loop.last_result()))
+        elif not_founds == len(all_roots):
+            raise arvados.errors.NotFoundError(
+                "{} not found".format(loc_s), service_errors)
         else:
         else:
-            raise arvados.errors.KeepReadError(loc_s)
+            raise arvados.errors.KeepReadError(
+                "failed to read {}".format(loc_s), service_errors)
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
@@ -674,6 +684,12 @@ class KeepClient(object):
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
         """
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
         """
+
+        if isinstance(data, unicode):
+            data = data.encode("ascii")
+        elif not isinstance(data, str):
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
@@ -714,14 +730,30 @@ class KeepClient(object):
 
         if loop.success():
             return thread_limiter.response()
 
         if loop.success():
             return thread_limiter.response()
-        raise arvados.errors.KeepWriteError(
-            "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, copies, thread_limiter.done()))
-
-    # Local storage methods need no-op num_retries arguments to keep
-    # integration tests happy.  With better isolation they could
-    # probably be removed again.
-    def local_store_put(self, data, num_retries=0):
+        if not roots_map:
+            raise arvados.errors.KeepWriteError(
+                "failed to write {}: no Keep services available ({})".format(
+                    data_hash, loop.last_result()))
+        else:
+            service_errors = ((key, roots_map[key].last_result)
+                              for key in local_roots
+                              if not roots_map[key].success_flag)
+            raise arvados.errors.KeepWriteError(
+                "failed to write {} (wanted {} copies but wrote {})".format(
+                    data_hash, copies, thread_limiter.done()), service_errors)
+
+    def local_store_put(self, data, copies=1, num_retries=None):
+        """A stub for put().
+
+        This method is used in place of the real put() method when
+        using local storage (see constructor's local_store argument).
+
+        copies and num_retries arguments are ignored: they are here
+        only for the sake of offering the same call signature as
+        put().
+
+        Data stored this way can be retrieved via local_store_get().
+        """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -730,7 +762,8 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s, num_retries=0):
+    def local_store_get(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
         try:
             locator = KeepLocator(loc_s)
         except ValueError: