self.confirmed_storage_classes = {}
self.response = None
self.storage_classes_tracking = True
- self.queue_data_lock = threading.Lock()
- self.pending_tries = max(copies, len(classes))+1
+ self.queue_data_lock = threading.RLock()
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
self.confirmed_storage_classes[st_class] += st_copies
except KeyError:
self.confirmed_storage_classes[st_class] = st_copies
+ self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
# wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
[ 1, ['foo'], 1, 'bar=1', 200],
[ 1, ['foo'], 1, None, 503],
- [ 2, ['foo'], 1, 'bar=1, foo=1', 200],
- [ 2, ['foo, bar'], 1, 'bar=2, foo=1', 200],
+ [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+ [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
]
for w_copies, w_classes, c_copies, c_classes, return_code in cases:
headers = {'x-keep-replicas-stored': c_copies}
if c_classes is not None:
headers.update({'x-keep-storage-classes-confirmed': c_classes})
- with tutil.mock_keep_responses(self.locator, return_code, **headers):
+ with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
self.keep_client.put(self.data, copies=w_copies, classes=w_classes)