Merge branch '18943-created-at-index' refs #18943
[arvados.git] / sdk / python / arvados / arvfile.py
index 0fcdc1e6334957f27a5ff1f10fbdedcf2716609a..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -593,7 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                self._keep.get(b)
+                self._keep.get(b, prefetch=True)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -841,9 +841,6 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
@@ -1099,7 +1096,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []