+
+
+ class KeepWriterQueue(Queue.Queue):
+ def __init__(self, copies):
+ Queue.Queue.__init__(self) # Old-style superclass
+ self.wanted_copies = copies
+ self.successful_copies = 0
+ self.response = None
+ self.successful_copies_lock = threading.Lock()
+ self.pending_tries = copies
+ self.pending_tries_notification = threading.Condition()
+
+ def write_success(self, response, replicas_nr):
+ with self.successful_copies_lock:
+ self.successful_copies += replicas_nr
+ self.response = response
+
+ def write_fail(self, ks, status_code):
+ with self.pending_tries_notification:
+ self.pending_tries += 1
+ self.pending_tries_notification.notify()
+
+ def pending_copies(self):
+ with self.successful_copies_lock:
+ return self.wanted_copies - self.successful_copies
+
+
+ class KeepWriterThreadPool(object):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+ self.total_task_nr = 0
+ self.wanted_copies = copies
+ if (not max_service_replicas) or (max_service_replicas >= copies):
+ num_threads = 1
+ else:
+ num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ _logger.debug("Pool max threads is %d", num_threads)
+ self.workers = []
+ self.queue = KeepClient.KeepWriterQueue(copies)
+ # Create workers
+ for _ in range(num_threads):
+ w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+ self.workers.append(w)
+
+ def add_task(self, ks, service_root):
+ self.queue.put((ks, service_root))
+ self.total_task_nr += 1
+
+ def done(self):
+ return self.queue.successful_copies
+
+ def join(self):
+ # Start workers
+ for worker in self.workers:
+ worker.start()
+ # Wait for finished work
+ self.queue.join()
+ with self.queue.pending_tries_notification:
+ self.queue.pending_tries_notification.notify_all()
+ for worker in self.workers:
+ worker.join()
+
+ def response(self):
+ return self.queue.response
+
+