From 32c9b81e2c1bce19673c73cb14490f6e9dde0fc6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 29 Mar 2022 15:42:11 +0000 Subject: [PATCH] 18941: bugfixing prefetch Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/python/arvados/arvfile.py | 15 +++++++++++---- sdk/python/arvados/keep.py | 13 ++++++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 0fcdc1e633..b21ebd3317 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -479,7 +479,7 @@ class _BlockManager(object): """ DEFAULT_PUT_THREADS = 2 - DEFAULT_GET_THREADS = 2 + DEFAULT_GET_THREADS = 4 def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None): """keep: KeepClient object to use""" @@ -593,6 +593,9 @@ class _BlockManager(object): b = self._prefetch_queue.get() if b is None: return + if self._keep.has_cache_slot(b): + continue + _logger.debug("prefetching %s", b) self._keep.get(b) except Exception: _logger.exception("Exception doing block prefetch") @@ -841,7 +844,7 @@ class _BlockManager(object): if not self.prefetch_enabled: return - if self._keep.get_from_cache(locator) is not None: + if self._keep.has_cache_slot(locator): return with self.lock: @@ -849,6 +852,7 @@ class _BlockManager(object): return self.start_get_threads() + # _logger.debug("pushing %s to prefetch", locator) self._prefetch_queue.put(locator) @@ -1099,7 +1103,7 @@ class ArvadosFile(object): if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) - prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32) locs = set() data = [] @@ -1117,7 +1121,10 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - return b''.join(data) + if len(data) == 1: + return data[0] + else: + return b''.join(data) @must_be_writable @synchronized diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 1a83eae944..df01c3a55b 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -176,7 +176,7 @@ class Keep(object): class KeepBlockCache(object): # Default RAM cache is 256MiB - def __init__(self, cache_max=(256 * 1024 * 1024)): + def __init__(self, cache_max=(1024 * 1024 * 1024)): self.cache_max = cache_max self._cache = [] self._cache_lock = threading.Lock() @@ -1036,14 +1036,19 @@ class KeepClient(object): else: return None - def get_from_cache(self, loc): + def get_from_cache(self, loc_s): """Fetch a block only if is in the cache, otherwise return None.""" - slot = self.block_cache.get(loc) + locator = KeepLocator(loc_s) + slot = self.block_cache.get(locator.md5sum) if slot is not None and slot.ready.is_set(): return slot.get() else: return None + def has_cache_slot(self, loc_s): + locator = KeepLocator(loc_s) + return self.block_cache.get(locator.md5sum) is not None + def refresh_signature(self, loc): """Ask Keep to get the remote block and return its local signature""" now = datetime.datetime.utcnow().isoformat("T") + 'Z' @@ -1333,5 +1338,3 @@ class KeepClient(object): if os.path.exists(os.path.join(self.local_store, locator.md5sum)): return True - def is_cached(self, locator): - return self.block_cache.reserve_cache(expect_hash) -- 2.30.2