from __future__ import absolute_import
from __future__ import division
+import copy
from future import standard_library
from future.utils import native_str
standard_library.install_aliases()
return None
return self._result['body']
- def put(self, hash_s, body, timeout=None):
+ def put(self, hash_s, body, timeout=None, headers={}):
+ put_headers = copy.copy(self.put_headers)
+ put_headers.update(headers)
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
curl = self._get_user_agent()
curl.setopt(pycurl.INFILESIZE, len(body))
curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+ '{}: {}'.format(k,v) for k,v in put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
self.successful_copies = 0
self.confirmed_storage_classes = {}
self.response = None
+ self.storage_classes_tracking = True
self.queue_data_lock = threading.Lock()
- self.pending_tries = copies
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
with self.queue_data_lock:
self.successful_copies += replicas_nr
- for st_class, st_copies in classes_confirmed.items():
- try:
- self.confirmed_storage_classes[st_class] += st_copies
- except KeyError:
- self.confirmed_storage_classes[st_class] = st_copies
+ if classes_confirmed is None:
+ self.storage_classes_tracking = False
+ elif self.storage_classes_tracking:
+ for st_class, st_copies in classes_confirmed.items():
+ try:
+ self.confirmed_storage_classes[st_class] += st_copies
+ except KeyError:
+ self.confirmed_storage_classes[st_class] = st_copies
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
with self.queue_data_lock:
return self.wanted_copies - self.successful_copies
+ def satisfied_classes(self):
+ with self.queue_data_lock:
+ if not self.storage_classes_tracking:
+ # Notifies disabled storage classes expectation to
+ # the outer loop.
+ return None
+ return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
def pending_classes(self):
with self.queue_data_lock:
- unsatisfied_classes = []
+ if self.wanted_storage_classes is None:
+ return []
+ unsatisfied_classes = copy.copy(self.wanted_storage_classes)
for st_class, st_copies in self.confirmed_storage_classes.items():
- if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
- unsatisfied_classes.append(st_class)
+ if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+ unsatisfied_classes.remove(st_class)
return unsatisfied_classes
def get_next_task(self):
while True:
self.get_nowait()
self.task_done()
- elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+ elif self.pending_tries > 0:
service, service_root = self.get_nowait()
if service.finished():
self.task_done()
self.total_task_nr += 1
def done(self):
- return self.queue.successful_copies
+ return self.queue.successful_copies, self.queue.satisfied_classes()
def join(self):
# Start workers
self.queue.task_done()
def do_task(self, service, service_root):
+ classes = self.queue.pending_classes()
+ headers = {}
+ if len(classes) > 0:
+ headers['X-Keep-Storage-Classes'] = ', '.join(classes)
success = bool(service.put(self.data_hash,
self.data,
- timeout=self.timeout))
+ timeout=self.timeout,
+ headers=headers))
result = service.last_result()
if not success:
classes_confirmed[stored_class] = int(stored_copies)
except (KeyError, ValueError):
# Storage classes confirmed header missing or corrupt
- classes_confirmed = {}
+ classes_confirmed = None
return result['body'].strip(), replicas_stored, classes_confirmed
arvados.util.new_request_id()),
'X-Keep-Desired-Replicas': str(copies),
}
- if len(classes) > 0:
- headers['X-Keep-Storage-Classes'] = ', '.join(classes)
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- done = 0
+ done_copies = 0
+ done_classes = []
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
loop.save_result(error)
continue
+ pending_classes = []
+ if done_classes is not None:
+ pending_classes = list(set(classes) - set(done_classes))
writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
- copies=copies - done,
+ copies=copies - done_copies,
max_service_replicas=self.max_replicas_per_service,
timeout=self.current_timeout(num_retries - tries_left),
- classes=classes)
+ classes=pending_classes)
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
if ks.finished():
continue
writer_pool.add_task(ks, service_root)
writer_pool.join()
- done += writer_pool.done()
- loop.save_result((done >= copies, writer_pool.total_task_nr))
+ pool_copies, pool_classes = writer_pool.done()
+ done_copies += pool_copies
+ if (done_classes is not None) and (pool_classes is not None):
+ done_classes += pool_classes
+ loop.save_result(
+ (done_copies >= copies and set(done_classes) == set(classes),
+ writer_pool.total_task_nr))
+ else:
+ # Old keepstore contacted without storage classes support:
+ # success is determined only by successful copies.
+ #
+ # Disable storage classes tracking from this point forward.
+ _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+ done_classes = None
+ loop.save_result(
+ (done_copies >= copies, writer_pool.total_task_nr))
if loop.success():
return writer_pool.response()
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} after {} (wanted {} copies but wrote {})".format(
- data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+ data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None):
"""A stub for put().
def test_KeepLongBinaryRWTest(self):
blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
- for i in range(0,23):
+ for i in range(0, 23):
blob_data = blob_data + blob_data
blob_locator = self.keep_client.put(blob_data)
self.assertRegex(
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
self._result['body'] = 'foobar'
- def put(self, data_hash, data, timeout):
+ def put(self, data_hash, data, timeout, headers):
time.sleep(self.delay)
if self.will_raise is not None:
raise self.will_raise
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_on_partial_success(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_when_some_crash(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_fail_when_too_many_crash(self):
for i in range(self.copies+1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies-1)
+ self.assertEqual(self.pool.done(), (self.copies-1, []))
@tutil.skip_sleep