class KeepBlockCache(object):
# Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=(1024 * 1024 * 1024)):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 0)
else:
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
if method == "HEAD":
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl.setopt(pycurl.SSL_VERIFYHOST, 0)
else:
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
self._setcurltimeouts(curl, timeout)
self.confirmed_storage_classes = {}
self.response = None
self.storage_classes_tracking = True
- self.queue_data_lock = threading.Lock()
- self.pending_tries = max(copies, len(classes))+1
+ self.queue_data_lock = threading.RLock()
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
self.confirmed_storage_classes[st_class] += st_copies
except KeyError:
self.confirmed_storage_classes[st_class] = st_copies
+ self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
result = service.last_result()
if not success:
- if result.get('status_code', None):
+ if result.get('status_code'):
_logger.debug("Request fail: PUT %s => %s %s",
self.data_hash,
- result['status_code'],
- result['body'])
+ result.get('status_code'),
+ result.get('body'))
raise self.TaskFailed()
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
self.get_counter = Counter()
self.hits_counter = Counter()
self.misses_counter = Counter()
+ self._storage_classes_unsupported_warning = False
+ self._default_classes = []
if local_store:
self.local_store = local_store
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
+ try:
+ self._default_classes = [
+ k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+ except KeyError:
+ # We're talking to an old cluster
+ pass
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
return None
+ def has_cache_slot(self, loc_s):
+ locator = KeepLocator(loc_s)
+ return self.block_cache.get(locator.md5sum) is not None
+
def refresh_signature(self, loc):
"""Ask Keep to get the remote block and return its local signature"""
now = datetime.datetime.utcnow().isoformat("T") + 'Z'
self.get_counter.add(1)
+ request_id = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
+ if headers is None:
+ headers = {}
+ headers['X-Request-Id'] = request_id
+
slot = None
blob = None
try:
self.misses_counter.add(1)
- if headers is None:
- headers = {}
- headers['X-Request-Id'] = (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id())
-
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# read data from the indicated service(s) instead of the usual
for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
- "failed to read {}: no Keep services available ({})".format(
- loc_s, loop.last_result()))
+ "[{}] failed to read {}: no Keep services available ({})".format(
+ request_id, loc_s, loop.last_result()))
elif not_founds == len(sorted_roots):
raise arvados.errors.NotFoundError(
- "{} not found".format(loc_s), service_errors)
+ "[{}] {} not found".format(request_id, loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+ "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
@retry.retry_method
- def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
+ def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
be written.
"""
+ classes = classes or self._default_classes
+
if not isinstance(data, bytes):
data = data.encode()
return loc_s
locator = KeepLocator(loc_s)
+ request_id = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
+ 'X-Request-Id': request_id,
'X-Keep-Desired-Replicas': str(copies),
}
roots_map = {}
# success is determined only by successful copies.
#
# Disable storage classes tracking from this point forward.
- _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+ if not self._storage_classes_unsupported_warning:
+ self._storage_classes_unsupported_warning = True
+ _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
done_classes = None
loop.save_result(
(done_copies >= copies, writer_pool.total_task_nr))
return writer_pool.response()
if not roots_map:
raise arvados.errors.KeepWriteError(
- "failed to write {}: no Keep services available ({})".format(
- data_hash, loop.last_result()))
+ "[{}] failed to write {}: no Keep services available ({})".format(
+ request_id, data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
- "failed to write {} after {} (wanted {} copies but wrote {})".format(
- data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+ "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+ request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)