self.confirmed_storage_classes = {}
self.response = None
self.storage_classes_tracking = True
- self.queue_data_lock = threading.Lock()
- self.pending_tries = max(copies, len(classes))+1
+ self.queue_data_lock = threading.RLock()
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
self.confirmed_storage_classes[st_class] += st_copies
except KeyError:
self.confirmed_storage_classes[st_class] = st_copies
+ self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
self.get_counter = Counter()
self.hits_counter = Counter()
self.misses_counter = Counter()
+ self._storage_classes_unsupported_warning = False
if local_store:
self.local_store = local_store
# success is determined only by successful copies.
#
# Disable storage classes tracking from this point forward.
- _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+ if not self._storage_classes_unsupported_warning:
+ self._storage_classes_unsupported_warning = True
+ _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
done_classes = None
loop.save_result(
(done_copies >= copies, writer_pool.total_task_nr))
"failed to write {} after {} (wanted {} copies but wrote {})".format(
data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
- def local_store_put(self, data, copies=1, num_retries=None):
+ def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().
This method is used in place of the real put() method when