5824: Merge branch 'master' into 5824-keep-web
[arvados.git] / sdk / python / arvados / keep.py
index 63b99daedd3d3931ac3822da62bff1d556d0806e..8ed86fd79e96144886744dba81786ef21ef12ae8 100644 (file)
@@ -241,19 +241,34 @@ class KeepClient(object):
         Should be used in a "with" block.
         """
         def __init__(self, todo):
+            self._started = 0
             self._todo = todo
             self._done = 0
             self._response = None
+            self._start_lock = threading.Condition()
             self._todo_lock = threading.Semaphore(todo)
             self._done_lock = threading.Lock()
+            self._local = threading.local()
 
         def __enter__(self):
+            self._start_lock.acquire()
+            if getattr(self._local, 'sequence', None) is not None:
+                # If the calling thread has used set_sequence(N), then
+                # we wait here until N other threads have started.
+                while self._started < self._local.sequence:
+                    self._start_lock.wait()
             self._todo_lock.acquire()
+            self._started += 1
+            self._start_lock.notifyAll()
+            self._start_lock.release()
             return self
 
         def __exit__(self, type, value, traceback):
             self._todo_lock.release()
 
+        def set_sequence(self, sequence):
+            self._local.sequence = sequence
+
         def shall_i_proceed(self):
             """
             Return true if the current thread should do stuff. Return
@@ -517,7 +532,11 @@ class KeepClient(object):
             return self._success
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
+            limiter = self.args['thread_limiter']
+            sequence = self.args['thread_sequence']
+            if sequence is not None:
+                limiter.set_sequence(sequence)
+            with limiter:
                 if not limiter.shall_i_proceed():
                     # My turn arrived, but the job has been done without
                     # me.
@@ -651,6 +670,7 @@ class KeepClient(object):
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
+                self.max_replicas_per_service = 1
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -663,6 +683,7 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                self.max_replicas_per_service = 1
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -720,6 +741,11 @@ class KeepClient(object):
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
                                    for ks in self._keep_services)
+            # For disk type services, max_replicas_per_service is 1
+            # It is unknown or unlimited for non-disk typed services.
+            for ks in accessible:
+                if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
+                    self.max_replicas_per_service = None
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -738,7 +764,6 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         sorted_roots = []
-
         # Use the services indicated by the given +K@... remote
         # service hints, if any are present and can be resolved to a
         # URI.
@@ -938,16 +963,16 @@ class KeepClient(object):
         locator = KeepLocator(loc_s)
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
+        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        thread_sequence = 0
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
+                sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
@@ -955,7 +980,8 @@ class KeepClient(object):
                 continue
 
             threads = []
-            for service_root, ks in roots_map.iteritems():
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
                 t = KeepClient.KeepWriterThread(
@@ -964,9 +990,11 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left))
+                    timeout=self.current_timeout(num_retries-tries_left),
+                    thread_sequence=thread_sequence)
                 t.start()
                 threads.append(t)
+                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
@@ -979,7 +1007,7 @@ class KeepClient(object):
                     data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
-                              for key in local_roots
+                              for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(