7161: clarify max_replicas_per_service
[arvados.git] / sdk / python / arvados / keep.py
index 28466304372f08c797072b4fed54269fd92751ef..1b076b61ae41b0200b27986631b13b822823c4c1 100644 (file)
@@ -76,7 +76,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} must be a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -648,12 +648,10 @@ class KeepClient(object):
                     'uuid': 'proxy',
                     '_service_root': proxy,
                     }]
-                self._writable_services = [{
-                    'uuid': 'proxy',
-                    '_service_root': proxy,
-                    }]
+                self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
+                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -666,6 +664,7 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                self.max_replicas_per_service = None
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -723,6 +722,12 @@ class KeepClient(object):
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
                                    for ks in self._keep_services)
+            # Set max_replicas_per_service to 1 for disk typed services.
+            # In case of, non-disk typed services, we will use None to mean unknown.
+            self.max_replicas_per_service = 1
+            for ks in accessible:
+                if ('disk' != ks.get('service_type')) and not ks.get('read_only'):
+                    self.max_replicas_per_service = None
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -741,15 +746,14 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         sorted_roots = []
-
         # Use the services indicated by the given +K@... remote
         # service hints, if any are present and can be resolved to a
         # URI.
         for hint in locator.hints:
             if hint.startswith('K@'):
                 if len(hint) == 7:
-                    sorted_roots.append(
-                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                     sorted_roots.append(
+                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
                 elif len(hint) == 29:
                     svc = self._gateway_services.get(hint[2:])
                     if svc:
@@ -759,7 +763,7 @@ class KeepClient(object):
         # for this locator, and return their service_roots (base URIs)
         # in that order.
         use_services = self._keep_services
-        if (need_writable == True):
+        if need_writable:
           use_services = self._writable_services
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
@@ -932,19 +936,19 @@ class KeepClient(object):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
         data_hash = hashlib.md5(data).hexdigest()
+        loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
-            return data_hash
-        locator = KeepLocator(data_hash + '+' + str(len(data)))
+            return loc_s
+        locator = KeepLocator(loc_s)
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
+        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop: