+ class KeepWriterQueue(queue.Queue):
+ def __init__(self, copies, classes=[]):
+ queue.Queue.__init__(self) # Old-style superclass
+ self.wanted_copies = copies
+ self.wanted_storage_classes = classes
+ self.successful_copies = 0
+ self.confirmed_storage_classes = {}
+ self.response = None
+ self.storage_classes_tracking = True
+ self.queue_data_lock = threading.RLock()
+ self.pending_tries = max(copies, len(classes))
+ self.pending_tries_notification = threading.Condition()
+
+ def write_success(self, response, replicas_nr, classes_confirmed):
+ with self.queue_data_lock:
+ self.successful_copies += replicas_nr
+ if classes_confirmed is None:
+ self.storage_classes_tracking = False
+ elif self.storage_classes_tracking:
+ for st_class, st_copies in classes_confirmed.items():
+ try:
+ self.confirmed_storage_classes[st_class] += st_copies
+ except KeyError:
+ self.confirmed_storage_classes[st_class] = st_copies
+ self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
+ self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
+
+ def write_fail(self, ks):
+ with self.pending_tries_notification:
+ self.pending_tries += 1
+ self.pending_tries_notification.notify()
+
+ def pending_copies(self):
+ with self.queue_data_lock:
+ return self.wanted_copies - self.successful_copies
+
+ def satisfied_classes(self):
+ with self.queue_data_lock:
+ if not self.storage_classes_tracking:
+ # Notifies disabled storage classes expectation to
+ # the outer loop.
+ return None
+ return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
+ def pending_classes(self):
+ with self.queue_data_lock:
+ if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
+ return []
+ unsatisfied_classes = copy.copy(self.wanted_storage_classes)
+ for st_class, st_copies in self.confirmed_storage_classes.items():
+ if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+ unsatisfied_classes.remove(st_class)
+ return unsatisfied_classes
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
+ class KeepWriterThreadPool(object):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
+ self.total_task_nr = 0
+ if (not max_service_replicas) or (max_service_replicas >= copies):
+ num_threads = 1
+ else:
+ num_threads = int(math.ceil(1.0*copies/max_service_replicas))
+ _logger.debug("Pool max threads is %d", num_threads)
+ self.workers = []
+ self.queue = KeepClient.KeepWriterQueue(copies, classes)
+ # Create workers
+ for _ in range(num_threads):
+ w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+ self.workers.append(w)
+
+ def add_task(self, ks, service_root):
+ self.queue.put((ks, service_root))
+ self.total_task_nr += 1
+
+ def done(self):
+ return self.queue.successful_copies, self.queue.satisfied_classes()
+
+ def join(self):
+ # Start workers
+ for worker in self.workers:
+ worker.start()
+ # Wait for finished work
+ self.queue.join()
+
+ def response(self):
+ return self.queue.response
+
+