import datetime
import hashlib
import logging
+import math
import os
import pycurl
import Queue
DEFAULT_PROXY_TIMEOUT = (20, 300)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
+ def __init__(self, want_copies, max_service_replicas):
self._started = 0
- self._todo = todo
+ self._want_copies = want_copies
self._done = 0
self._response = None
self._start_lock = threading.Condition()
- self._todo_lock = threading.Semaphore(todo)
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
self._local = threading.local()
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- self.max_replicas_per_service = 1
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
# Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- thread_sequence = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
loop.save_result(error)
continue
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
service_root=service_root,
thread_limiter=thread_limiter,
timeout=self.current_timeout(num_retries-tries_left),
- thread_sequence=thread_sequence)
+ thread_sequence=len(threads))
t.start()
threads.append(t)
- thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))