Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / sdk / python / arvados / keep.py
index b3d64a41981b7bcbaf293a2f90f43abc79128f1a..ec9f6f6422e4736133ada98e971213da6c014e9b 100644 (file)
@@ -2,6 +2,7 @@ import cStringIO
 import datetime
 import hashlib
 import logging
+import math
 import os
 import pycurl
 import Queue
@@ -219,21 +220,27 @@ class KeepClient(object):
     DEFAULT_PROXY_TIMEOUT = (20, 300)
 
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
 
         Should be used in a "with" block.
         """
-        def __init__(self, todo):
+        def __init__(self, want_copies, max_service_replicas):
             self._started = 0
-            self._todo = todo
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
             self._start_lock = threading.Condition()
-            self._todo_lock = threading.Semaphore(todo)
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
             self._local = threading.local()
 
@@ -258,34 +265,28 @@ class KeepClient(object):
 
         def shall_i_proceed(self):
             """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             """
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
 
         def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             """
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
 
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
 
@@ -645,6 +646,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -658,7 +660,6 @@ class KeepClient(object):
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -671,7 +672,6 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
-                self.max_replicas_per_service = 1
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -955,10 +955,8 @@ class KeepClient(object):
         # Tell the proxy how many copies we want it to store
         headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        thread_sequence = 0
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -968,6 +966,8 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -980,10 +980,9 @@ class KeepClient(object):
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=thread_sequence)
+                    thread_sequence=len(threads))
                 t.start()
                 threads.append(t)
-                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))