21639: Improve critical path of read() from cache
authorPeter Amstutz <peter.amstutz@curii.com>
Mon, 1 Apr 2024 19:58:06 +0000 (15:58 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 2 Apr 2024 01:01:58 +0000 (21:01 -0400)
* Don't use tobytes(), it makes a copy, and it should be be zero-copy.
* Prefetching adds a lot of overhead.  Don't do it.
* Don't use a list comprehension to calculate cache size

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py
services/fuse/arvados_fuse/command.py

index 4b95835aac0f25a57fd999a2c5f9cff0e54014e1..0cc7d25a331871c88860357853d1a21898eae965 100644 (file)
@@ -1060,7 +1060,8 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+            if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+                prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []
@@ -1068,17 +1069,21 @@ class ArvadosFile(object):
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
-                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
                 locs.add(lr.locator)
             else:
                 break
 
-        for lr in prefetch:
-            if lr.locator not in locs:
-                self.parent._my_block_manager().block_prefetch(lr.locator)
-                locs.add(lr.locator)
+        if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+            for lr in prefetch:
+                if lr.locator not in locs:
+                    self.parent._my_block_manager().block_prefetch(lr.locator)
+                    locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized
index 1d0fc5f159b18e8b490d044cb33ed3b0cfbf5921..6b34a1f9331688f6ab7f4e28c85664aec1abb5e3 100644 (file)
@@ -271,8 +271,14 @@ class KeepBlockCache(object):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
+        sm = 0
+        for slot in self._cache.values():
+            sm += slot.size()
+
+        if sm <= cache_max and len(self._cache) <= max_slots:
+            return
+
         _evict_candidates = collections.deque(self._cache.values())
-        sm = sum([slot.size() for slot in _evict_candidates])
         while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
             slot = _evict_candidates.popleft()
             if not slot.ready.is_set():
@@ -926,7 +932,10 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
-        self.num_prefetch_threads = num_prefetch_threads or 2
+        if num_prefetch_threads is not None:
+            self.num_prefetch_threads = num_prefetch_threads
+        else:
+            self.num_prefetch_threads = 2
         self._prefetch_queue = None
         self._prefetch_threads = None
 
index 719ec7ee959701fde58bfef0dfb8b3c46dc4b895..29ace2e52e6f82b1b784049cd8fc05baeac75575 100644 (file)
@@ -487,7 +487,8 @@ class Mount(object):
             # layer actually ends up being slower.
             # Experimentally, capping 7 threads seems to
             # be a sweet spot.
-            prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            prefetch_threads = 0
 
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),