18941: Separate get() behavior for prefetch
[arvados.git] / sdk / python / arvados / arvfile.py
index e915ff2ac0a37c86c635f9ce68f52d227fed1725..a13575b715922f306c14ce4bb6a82fbe766fae8f 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -593,7 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                self._keep.get(b)
+                self._keep.get(b, cache_slot_get=False)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -760,9 +760,10 @@ class _BlockManager(object):
         self._delete_bufferblock(locator)
 
     def _delete_bufferblock(self, locator):
-        bb = self._bufferblocks[locator]
-        bb.clear()
-        del self._bufferblocks[locator]
+        if locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            bb.clear()
+            del self._bufferblocks[locator]
 
     def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
@@ -840,14 +841,12 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
         self.start_get_threads()
+        # _logger.debug("pushing %s to prefetch", locator)
         self._prefetch_queue.put(locator)
 
 
@@ -1098,7 +1097,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []