Add flag to get() which causes it to return None immediately if a cache
slot already exists for a block.
In the standard behavior, if multiple readers try to get() the same
block, the first one will start downloading the block, and all the
others will wait and return the block content from the cache slot
when complete.
With the new optional behavior, if multiple readers try to get() the same
block, the first one will start downloading the block, and all the
others will immediately return None.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>
b = self._prefetch_queue.get()
if b is None:
return
b = self._prefetch_queue.get()
if b is None:
return
- if self._keep.has_cache_slot(b):
- continue
- _logger.debug("prefetching %s", b)
- self._keep.get(b)
+ self._keep.get(b, cache_slot_get=False)
except Exception:
_logger.exception("Exception doing block prefetch")
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
if not self.prefetch_enabled:
return
- if self._keep.has_cache_slot(locator):
- return
-
with self.lock:
if locator in self._bufferblocks:
return
with self.lock:
if locator in self._bufferblocks:
return
- def has_cache_slot(self, loc_s):
- locator = KeepLocator(loc_s)
- return self.block_cache.get(locator.md5sum) is not None
-
def refresh_signature(self, loc):
"""Ask Keep to get the remote block and return its local signature"""
now = datetime.datetime.utcnow().isoformat("T") + 'Z'
def refresh_signature(self, loc):
"""Ask Keep to get the remote block and return its local signature"""
now = datetime.datetime.utcnow().isoformat("T") + 'Z'
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
self.hits_counter.add(1)
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
self.hits_counter.add(1)
- blob = slot.get()
- if blob is None:
- raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s))
- return blob
+ if cache_slot_get:
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+ else:
+ return None
self.misses_counter.add(1)
self.misses_counter.add(1)