18941: bugfixing prefetch
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 15:42:11 +0000 (15:42 +0000)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 16:02:50 +0000 (16:02 +0000)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py

index 0fcdc1e6334957f27a5ff1f10fbdedcf2716609a..b21ebd331769109ca0cca3b23e2c45f17ed2c9ed 100644 (file)
@@ -479,7 +479,7 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
+    DEFAULT_GET_THREADS = 4
 
     def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
         """keep: KeepClient object to use"""
@@ -593,6 +593,9 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
+                if self._keep.has_cache_slot(b):
+                    continue
+                _logger.debug("prefetching %s", b)
                 self._keep.get(b)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
@@ -841,7 +844,7 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
+        if self._keep.has_cache_slot(locator):
             return
 
         with self.lock:
@@ -849,6 +852,7 @@ class _BlockManager(object):
                 return
 
         self.start_get_threads()
+        # _logger.debug("pushing %s to prefetch", locator)
         self._prefetch_queue.put(locator)
 
 
@@ -1099,7 +1103,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
 
         locs = set()
         data = []
@@ -1117,7 +1121,10 @@ class ArvadosFile(object):
                 self.parent._my_block_manager().block_prefetch(lr.locator)
                 locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized
index 1a83eae944c59f8dde5e3a7c63de8bbe9c62a9c9..df01c3a55b74e3e1792f1c8e0b0d1846e45e20ee 100644 (file)
@@ -176,7 +176,7 @@ class Keep(object):
 
 class KeepBlockCache(object):
     # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=(1024 * 1024 * 1024)):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
@@ -1036,14 +1036,19 @@ class KeepClient(object):
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
             return None
 
+    def has_cache_slot(self, loc_s):
+        locator = KeepLocator(loc_s)
+        return self.block_cache.get(locator.md5sum) is not None
+
     def refresh_signature(self, loc):
         """Ask Keep to get the remote block and return its local signature"""
         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
@@ -1333,5 +1338,3 @@ class KeepClient(object):
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
 
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)