- class ThreadLimiter(object):
- """Limit the number of threads writing to Keep at once.
-
- This ensures that only a number of writer threads that could
- potentially achieve the desired replication level run at once.
- Once the desired replication level is achieved, queued threads
- are instructed not to run.
-
- Should be used in a "with" block.
- """
- def __init__(self, want_copies, max_service_replicas):
- self._started = 0
- self._want_copies = want_copies
- self._done = 0
- self._thread_failures = 0
- self._response = None
- self._start_lock = threading.Condition()
- if (not max_service_replicas) or (max_service_replicas >= want_copies):
- max_threads = 1
- else:
- max_threads = math.ceil(float(want_copies) / max_service_replicas)
- _logger.debug("Limiter max threads is %d", max_threads)
- self._todo_lock = threading.Semaphore(max_threads)
- self._done_lock = threading.Lock()
- self._thread_failures_lock = threading.Lock()
- self._local = threading.local()
-
- def __enter__(self):
- self._start_lock.acquire()
- if getattr(self._local, 'sequence', None) is not None:
- # If the calling thread has used set_sequence(N), then
- # we wait here until N other threads have started.
- while self._started < self._local.sequence:
- self._start_lock.wait()
- self._todo_lock.acquire()
- self._started += 1
- self._start_lock.notifyAll()
- self._start_lock.release()
- return self
-
- def __exit__(self, type, value, traceback):
- with self._thread_failures_lock:
- if self._thread_failures > 0:
- self._thread_failures -= 1
- self._todo_lock.release()
-
- # If work is finished, release al pending threads
- if not self.shall_i_proceed():
- self._todo_lock.release()
-
- def set_sequence(self, sequence):
- self._local.sequence = sequence
-
- def shall_i_proceed(self):
- """
- Return true if the current thread should write to Keep.
- Return false otherwise.
- """
- with self._done_lock:
- return (self._done < self._want_copies)
-
- def save_response(self, response_body, replicas_stored):
- """
- Records a response body (a locator, possibly signed) returned by
- the Keep server, and the number of replicas it stored.
- """
- if replicas_stored == 0:
- # Failure notification, should start a new thread to try to reach full replication
- with self._thread_failures_lock:
- self._thread_failures += 1
- else:
- with self._done_lock:
- self._done += replicas_stored
- self._response = response_body
-
- def response(self):
- """Return the body from the response to a PUT request."""
- with self._done_lock:
- return self._response
-
- def done(self):
- """Return the total number of replicas successfully stored."""
- with self._done_lock:
- return self._done