self.permission_hint()] + self.hints
if s is not None)
+ def stripped(self):
+ return "%s+%i" % (self.md5sum, self.size)
+
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
# must be a hex string of the given length.
break
sm = sum([slot.size() for slot in self._cache])
+ def _get(self, locator):
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n
+ return None
+
+ def get(self, locator):
+ with self._cache_lock:
+ return self._get(locator)
+
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
with self._cache_lock:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
+ n = self._get(locator)
+ if n:
+ return n, False
+ else:
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
class KeepClient(object):
HTTP_ERRORS = (requests.exceptions.RequestException,
socket.error, ssl.SSLError)
- def __init__(self, root, **headers):
+ def __init__(self, root, session, **headers):
self.root = root
self.last_result = None
self.success_flag = None
+ self.session = session
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
_logger.debug("Request: GET %s", url)
try:
with timer.Timer() as t:
- result = requests.get(url.encode('utf-8'),
+ result = self.session.get(url.encode('utf-8'),
headers=self.get_headers,
timeout=timeout)
except self.HTTP_ERRORS as e:
content = result.content
_logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
self.last_status(), len(content), t.msecs,
- (len(content)/(1024.0*1024))/t.secs)
+ (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
if self.success_flag:
resp_md5 = hashlib.md5(content).hexdigest()
if resp_md5 == locator.md5sum:
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
try:
- result = requests.put(url.encode('utf-8'),
+ result = self.session.put(url.encode('utf-8'),
data=body,
headers=self.put_headers,
timeout=timeout)
def run_with_limiter(self, limiter):
if self.service.finished():
return
- _logger.debug("KeepWriterThread %s proceeding %s %s",
+ _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
+ len(self.args['data']),
self.args['service_root'])
self._success = bool(self.service.put(
self.args['data_hash'],
status = self.service.last_status()
if self._success:
result = self.service.last_result
- _logger.debug("KeepWriterThread %s succeeded %s %s",
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
+ len(self.args['data']),
self.args['service_root'])
# Tick the 'done' counter for the number of replica
# reported stored by the server, for the case that
def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
- num_retries=0):
+ num_retries=0, session=None):
"""Initialize a new KeepClient.
Arguments:
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.session = session if session is not None else requests.Session()
if proxy:
if not proxy.endswith('/'):
proxy += '/'
local_roots = self.weighted_service_roots(md5_s, force_rebuild)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, **headers)
+ roots_map[root] = self.KeepService(root, self.session, **headers)
return local_roots
@staticmethod
return None
@retry.retry_method
- def get(self, loc_s, num_retries=None):
+ def get(self, loc_s, num_retries=None, cache_only=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
+ * cache_only: If true, return the block data only if already present in
+ cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
+ if cache_only:
+ slot = self.block_cache.get(expect_hash)
+ if slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
for hint in locator.hints if hint.startswith('K@')]
# Map root URLs their KeepService objects.
- roots_map = {root: self.KeepService(root) for root in hint_roots}
+ roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
blob = None
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
if loop.success():
return blob
- # No servers fulfilled the request. Count how many responded
- # "not found;" if the ratio is high enough (currently 75%), report
- # Not Found; otherwise a generic error.
+ try:
+ all_roots = local_roots + hint_roots
+ except NameError:
+ # We never successfully fetched local_roots.
+ all_roots = hint_roots
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
- not_founds = sum(1 for ks in roots_map.values()
- if ks.last_status() in set([403, 404, 410]))
- if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
- raise arvados.errors.NotFoundError(loc_s)
+ not_founds = sum(1 for key in all_roots
+ if roots_map[key].last_status() in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result)
+ for key in all_roots)
+ if not roots_map:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}: no Keep services available ({})".format(
+ loc_s, loop.last_result()))
+ elif not_founds == len(all_roots):
+ raise arvados.errors.NotFoundError(
+ "{} not found".format(loc_s), service_errors)
else:
- raise arvados.errors.KeepReadError(loc_s)
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s), service_errors)
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
if loop.success():
return thread_limiter.response()
- raise arvados.errors.KeepWriteError(
- "Write fail for %s: wanted %d but wrote %d" %
- (data_hash, copies, thread_limiter.done()))
-
- # Local storage methods need no-op num_retries arguments to keep
- # integration tests happy. With better isolation they could
- # probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ if not roots_map:
+ raise arvados.errors.KeepWriteError(
+ "failed to write {}: no Keep services available ({})".format(
+ data_hash, loop.last_result()))
+ else:
+ service_errors = ((key, roots_map[key].last_result)
+ for key in local_roots
+ if not roots_map[key].success_flag)
+ raise arvados.errors.KeepWriteError(
+ "failed to write {} (wanted {} copies but wrote {})".format(
+ data_hash, copies, thread_limiter.done()), service_errors)
+
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
+
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
+
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError:
return ''
with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
return f.read()
+
+ def is_cached(self, locator):
+ return self.block_cache.reserve_cache(expect_hash)