b = self._prefetch_queue.get()
if b is None:
return
- if self._keep.has_cache_slot(b):
- continue
- _logger.debug("prefetching %s", b)
- self._keep.get(b)
+ self._keep.get(b, cache_slot_get=False)
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
- if self._keep.has_cache_slot(locator):
- return
-
with self.lock:
if locator in self._bufferblocks:
return
else:
return None
- def has_cache_slot(self, loc_s):
- locator = KeepLocator(loc_s)
- return self.block_cache.get(locator.md5sum) is not None
-
def refresh_signature(self, loc):
"""Ask Keep to get the remote block and return its local signature"""
now = datetime.datetime.utcnow().isoformat("T") + 'Z'
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
self.hits_counter.add(1)
- blob = slot.get()
- if blob is None:
- raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s))
- return blob
+ if cache_slot_get:
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+ else:
+ return None
self.misses_counter.add(1)