7696: Improve PySDK KeepClient.ThreadLimiter.
authorBrett Smith <brett@curoverse.com>
Wed, 11 Nov 2015 22:08:39 +0000 (17:08 -0500)
committerBrett Smith <brett@curoverse.com>
Fri, 13 Nov 2015 14:29:24 +0000 (09:29 -0500)
* Move the calculation of how many threads to allow into the class.
* Teach it to handle cases where max_replicas_per_service is known and
  greater than 1.  This will never happen today, but is an anticipated
  improvement.
* Update docstrings to reflect current reality.

These are all changes I made while debugging the previous race
condition.

sdk/python/arvados/keep.py

index 2b718d7a4d58397e8e8d8aaec6cb81d078e65843..ec9f6f6422e4736133ada98e971213da6c014e9b 100644 (file)
@@ -2,6 +2,7 @@ import cStringIO
 import datetime
 import hashlib
 import logging
+import math
 import os
 import pycurl
 import Queue
@@ -219,21 +220,27 @@ class KeepClient(object):
     DEFAULT_PROXY_TIMEOUT = (20, 300)
 
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
 
         Should be used in a "with" block.
         """
-        def __init__(self, todo):
+        def __init__(self, want_copies, max_service_replicas):
             self._started = 0
-            self._todo = todo
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
             self._start_lock = threading.Condition()
-            self._todo_lock = threading.Semaphore(todo)
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
             self._local = threading.local()
 
@@ -258,34 +265,28 @@ class KeepClient(object):
 
         def shall_i_proceed(self):
             """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             """
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
 
         def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             """
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
 
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
 
@@ -965,7 +966,8 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]: