From: Peter Amstutz Date: Tue, 29 Mar 2022 20:52:47 +0000 (-0400) Subject: 18941: Separate get() behavior for prefetch X-Git-Tag: 2.4.0~20^2~8 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a46914b92aaae9b9d8eae4021d28360abf835a76 18941: Separate get() behavior for prefetch Add flag to get() which causes it to return None immediately if a cache slot already exists for a block. In the standard behavior, if multiple readers try to get() the same block, the first one will start downloading the block, and all the others will wait and return the block content from the cache slot when complete. With the new optional behavior, if multiple readers try to get() the same block, the first one will start downloading the block, and all the others will immediately return None. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index fbf593d026..a13575b715 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -593,10 +593,7 @@ class _BlockManager(object): b = self._prefetch_queue.get() if b is None: return - if self._keep.has_cache_slot(b): - continue - _logger.debug("prefetching %s", b) - self._keep.get(b) + self._keep.get(b, cache_slot_get=False) except Exception: _logger.exception("Exception doing block prefetch") @@ -844,9 +841,6 @@ class _BlockManager(object): if not self.prefetch_enabled: return - if self._keep.has_cache_slot(locator): - return - with self.lock: if locator in self._bufferblocks: return diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 94104586de..43d71f17e4 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -1045,10 +1045,6 @@ class KeepClient(object): else: return None - def has_cache_slot(self, loc_s): - locator = KeepLocator(loc_s) - return self.block_cache.get(locator.md5sum) is not None - def refresh_signature(self, loc): """Ask Keep to get the remote block and return its local signature""" now = datetime.datetime.utcnow().isoformat("T") + 'Z' @@ -1062,7 +1058,7 @@ class KeepClient(object): def get(self, loc_s, **kwargs): return self._get_or_head(loc_s, method="GET", **kwargs) - def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None): + def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -1102,11 +1098,14 @@ class KeepClient(object): slot, first = self.block_cache.reserve_cache(locator.md5sum) if not first: self.hits_counter.add(1) - blob = slot.get() - if blob is None: - raise arvados.errors.KeepReadError( - "failed to read {}".format(loc_s)) - return blob + if cache_slot_get: + blob = slot.get() + if blob is None: + raise arvados.errors.KeepReadError( + "failed to read {}".format(loc_s)) + return blob + else: + return None self.misses_counter.add(1)