class KeepWriterQueue(queue.Queue):
- def __init__(self, copies):
+ def __init__(self, copies, classes=[]):
queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
+ self.wanted_storage_classes = classes
self.successful_copies = 0
+ self.confirmed_storage_classes = {}
self.response = None
- self.successful_copies_lock = threading.Lock()
+ self.queue_data_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
- def write_success(self, response, replicas_nr):
- with self.successful_copies_lock:
+ def write_success(self, response, replicas_nr, classes_confirmed):
+ with self.queue_data_lock:
self.successful_copies += replicas_nr
+ for st_class, st_copies in classes_confirmed.items():
+ try:
+ self.confirmed_storage_classes[st_class] += st_copies
+ except KeyError:
+ self.confirmed_storage_classes[st_class] = st_copies
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
self.pending_tries_notification.notify()
def pending_copies(self):
- with self.successful_copies_lock:
+ with self.queue_data_lock:
return self.wanted_copies - self.successful_copies
+ def pending_classes(self):
+ with self.queue_data_lock:
+ unsatisfied_classes = []
+ for st_class, st_copies in self.confirmed_storage_classes.items():
+ if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
+ unsatisfied_classes.append(st_class)
+ return unsatisfied_classes
+
def get_next_task(self):
with self.pending_tries_notification:
while True:
- if self.pending_copies() < 1:
+ if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
# This notify_all() is unnecessary --
# write_success() already called notify_all()
# when pending<1 became true, so it's not
while True:
self.get_nowait()
self.task_done()
- elif self.pending_tries > 0:
+ elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
service, service_root = self.get_nowait()
if service.finished():
self.task_done()
class KeepWriterThreadPool(object):
- def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
self.total_task_nr = 0
- self.wanted_copies = copies
if (not max_service_replicas) or (max_service_replicas >= copies):
num_threads = 1
else:
num_threads = int(math.ceil(1.0*copies/max_service_replicas))
_logger.debug("Pool max threads is %d", num_threads)
self.workers = []
- self.queue = KeepClient.KeepWriterQueue(copies)
+ self.queue = KeepClient.KeepWriterQueue(copies, classes)
# Create workers
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
except queue.Empty:
return
try:
- locator, copies = self.do_task(service, service_root)
+ locator, copies, classes = self.do_task(service, service_root)
except Exception as e:
if not isinstance(e, self.TaskFailed):
_logger.exception("Exception in KeepWriterThread")
self.queue.write_fail(service)
else:
- self.queue.write_success(locator, copies)
+ self.queue.write_success(locator, copies, classes)
finally:
self.queue.task_done()
except (KeyError, ValueError):
replicas_stored = 1
- return result['body'].strip(), replicas_stored
+ classes_confirmed = {}
+ try:
+ scch = result['headers']['x-keep-storage-classes-confirmed']
+ for confirmation in scch.replace(' ', '').split(','):
+ if '=' in confirmation:
+ stored_class, stored_copies = confirmation.split('=')[:2]
+ classes_confirmed[stored_class] = int(stored_copies)
+ except (KeyError, ValueError):
+ # Storage classes confirmed header missing or corrupt
+ classes_confirmed = {}
+
+ return result['body'].strip(), replicas_stored, classes_confirmed
def __init__(self, api_client=None, proxy=None,
"failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
- def put(self, data, copies=2, num_retries=None, request_id=None):
+ def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
*each* Keep server if it returns temporary failures, with
exponential backoff. The default value is set when the
KeepClient is initialized.
+ * classes: An optional list of storage class names where copies should
+ be written.
"""
if not isinstance(data, bytes):
arvados.util.new_request_id()),
'X-Keep-Desired-Replicas': str(copies),
}
+ if len(classes) > 0:
+ headers['X-Keep-Storage-Classes'] = ', '.join(classes)
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
- timeout=self.current_timeout(num_retries - tries_left))
+ timeout=self.current_timeout(num_retries - tries_left),
+ classes=classes)
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
if ks.finished():