replicas_stored = int(result.headers['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.text.strip(), replicas_stored)
+ limiter.save_response(result.content.strip(), replicas_stored)
elif status is not None:
_logger.debug("Request fail: PUT %s => %s %s",
self.args['data_hash'], status,
- self.service.last_result.text)
+ self.service.last_result.content)
def __init__(self, api_client=None, proxy=None,
r['service_port'])
_logger.debug(str(self._keep_services))
- def _service_weight(self, hash, service_uuid):
+ def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
block with a known hash.
The weight is md5(h + u) where u is the last 15 characters of
the service endpoint's UUID.
"""
- return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
+ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def weighted_service_roots(self, hash, force_rebuild=False):
+ def weighted_service_roots(self, data_hash, force_rebuild=False):
"""Return an array of Keep service endpoints, in the order in
which they should be probed when reading or writing data with
the given hash.
self.build_services_list(force_rebuild)
# Sort the available services by weight (heaviest first) for
- # this hash, and return their service_roots (base URIs) in
- # that order.
+ # this data_hash, and return their service_roots (base URIs)
+ # in that order.
sorted_roots = [
svc['_service_root'] for svc in sorted(
self._keep_services,
reverse=True,
- key=lambda svc: self._service_weight(hash, svc['uuid']))]
- _logger.debug(hash + ': ' + str(sorted_roots))
+ key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+ _logger.debug(data_hash + ': ' + str(sorted_roots))
return sorted_roots
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
if loop.success():
return blob
- # No servers fulfilled the request. Count how many responded
- # "not found;" if the ratio is high enough (currently 75%), report
- # Not Found; otherwise a generic error.
+ try:
+ all_roots = local_roots + hint_roots
+ except NameError:
+ # We never successfully fetched local_roots.
+ all_roots = hint_roots
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
- not_founds = sum(1 for ks in roots_map.values()
- if ks.last_status() in set([403, 404, 410]))
- if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
- raise arvados.errors.NotFoundError(loc_s)
+ not_founds = sum(1 for key in all_roots
+ if roots_map[key].last_status() in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result)
+ for key in all_roots)
+ if not roots_map:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}: no Keep services available ({})".format(
+ loc_s, loop.last_result()))
+ elif not_founds == len(all_roots):
+ raise arvados.errors.NotFoundError(
+ "{} not found".format(loc_s), service_errors)
else:
- raise arvados.errors.KeepReadError(loc_s)
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s), service_errors)
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
exponential backoff. The default value is set when the
KeepClient is initialized.
"""
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
return data_hash
if loop.success():
return thread_limiter.response()
- raise arvados.errors.KeepWriteError(
- "Write fail for %s: wanted %d but wrote %d" %
- (data_hash, copies, thread_limiter.done()))
-
- # Local storage methods need no-op num_retries arguments to keep
- # integration tests happy. With better isolation they could
- # probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ if not roots_map:
+ raise arvados.errors.KeepWriteError(
+ "failed to write {}: no Keep services available ({})".format(
+ data_hash, loop.last_result()))
+ else:
+ service_errors = ((key, roots_map[key].last_result)
+ for key in local_roots
+ if not roots_map[key].success_flag)
+ raise arvados.errors.KeepWriteError(
+ "failed to write {} (wanted {} copies but wrote {})".format(
+ data_hash, copies, thread_limiter.done()), service_errors)
+
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
+
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
+
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError: