8281: Limit # write threads to #copies remaining, not #copies total.
[arvados.git] / sdk / python / arvados / keep.py
index c584f04ca06226b842e690751fb323fe60ac256a..cd39f83703f4b341e07201a67c0afb58a11574ae 100644 (file)
@@ -230,13 +230,13 @@ class Counter(object):
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:       64 seconds
+    # Default Keep server read timeout:       256 seconds
     # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:        64 seconds
+    # Default Keep proxy read timeout:        256 seconds
     # Default Keep proxy bandwidth minimum:   32768 bytes per second
-    DEFAULT_TIMEOUT = (2, 64, 32768)
-    DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
+    DEFAULT_TIMEOUT = (2, 256, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
     class ThreadLimiter(object):
         """Limit the number of threads writing to Keep at once.
@@ -379,6 +379,7 @@ class KeepClient(object):
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             curl = self._get_user_agent()
+            ok = None
             try:
                 with timer.Timer() as t:
                     self._headers = {}
@@ -410,7 +411,6 @@ class KeepClient(object):
                 self._result = {
                     'error': e,
                 }
-                ok = False
             self._usable = ok != False
             if self._result.get('status_code', None):
                 # The client worked well enough to get an HTTP status
@@ -445,6 +445,7 @@ class KeepClient(object):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
+            ok = None
             try:
                 with timer.Timer() as t:
                     self._headers = {}
@@ -486,7 +487,6 @@ class KeepClient(object):
                 self._result = {
                     'error': e,
                 }
-                ok = False
             self._usable = ok != False # still usable if ok is True or None
             if self._result.get('status_code', None):
                 # Client is functional. See comment in get().
@@ -631,7 +631,7 @@ class KeepClient(object):
           seconds. Because timeouts are often a result of transient server
           load, the actual connection timeout will be increased by a factor
           of two on each retry.
-          Default: (2, 64, 32768).
+          Default: (2, 256, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
@@ -639,7 +639,7 @@ class KeepClient(object):
           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
           described above for adjusting connection timeouts on retry also
           applies.
-          Default: (20, 64, 32768).
+          Default: (20, 256, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -1018,6 +1018,7 @@ class KeepClient(object):
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        done = 0
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1028,7 +1029,7 @@ class KeepClient(object):
                 continue
 
             thread_limiter = KeepClient.ThreadLimiter(
-                copies, self.max_replicas_per_service)
+                copies - done, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -1046,7 +1047,8 @@ class KeepClient(object):
                 threads.append(t)
             for t in threads:
                 t.join()
-            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+            done += thread_limiter.done()
+            loop.save_result((done >= copies, len(threads)))
 
         if loop.success():
             return thread_limiter.response()