import datetime
import hashlib
import logging
+import math
import os
import pycurl
import Queue
DEFAULT_PROXY_TIMEOUT = (20, 300)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
+ def __init__(self, want_copies, max_service_replicas):
self._started = 0
- self._todo = todo
+ self._want_copies = want_copies
self._done = 0
self._response = None
self._start_lock = threading.Condition()
- self._todo_lock = threading.Semaphore(todo)
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
self._local = threading.local()
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
loop.save_result(error)
continue
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]: