21639: Reenable prefetch, but not on every read()
[arvados.git] / sdk / python / arvados / arvfile.py
index e915ff2ac0a37c86c635f9ce68f52d227fed1725..666efb078d58e3a798e9721613919f0d0bf82469 100644 (file)
@@ -100,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
             yield data
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self.name)
+        return re.sub(r'\.(bz2|gz)$', '', self.name)
 
     @_FileLikeObjectBase._before_close
     def seek(self, pos, whence=os.SEEK_SET):
@@ -479,20 +479,20 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep,
+                 copies=None,
+                 put_threads=None,
+                 num_retries=None,
+                 storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
-        self._prefetch_queue = None
-        self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -586,29 +586,6 @@ class _BlockManager(object):
                     thread.daemon = True
                     thread.start()
 
-    def _block_prefetch_worker(self):
-        """The background downloader thread."""
-        while True:
-            try:
-                b = self._prefetch_queue.get()
-                if b is None:
-                    return
-                self._keep.get(b)
-            except Exception:
-                _logger.exception("Exception doing block prefetch")
-
-    @synchronized
-    def start_get_threads(self):
-        if self._prefetch_threads is None:
-            self._prefetch_queue = queue.Queue()
-            self._prefetch_threads = []
-            for i in range(0, self.num_get_threads):
-                thread = threading.Thread(target=self._block_prefetch_worker)
-                self._prefetch_threads.append(thread)
-                thread.daemon = True
-                thread.start()
-
-
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -621,14 +598,6 @@ class _BlockManager(object):
         self._put_threads = None
         self._put_queue = None
 
-        if self._prefetch_threads is not None:
-            for t in self._prefetch_threads:
-                self._prefetch_queue.put(None)
-            for t in self._prefetch_threads:
-                t.join()
-        self._prefetch_threads = None
-        self._prefetch_queue = None
-
     def __enter__(self):
         return self
 
@@ -760,9 +729,10 @@ class _BlockManager(object):
         self._delete_bufferblock(locator)
 
     def _delete_bufferblock(self, locator):
-        bb = self._bufferblocks[locator]
-        bb.clear()
-        del self._bufferblocks[locator]
+        if locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            bb.clear()
+            del self._bufferblocks[locator]
 
     def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
@@ -827,28 +797,20 @@ class _BlockManager(object):
                         owner.flush(sync=True)
                     self.delete_bufferblock(k)
 
+        self.stop_threads()
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
-
-        This assumes that the underlying KeepClient implements a block cache,
-        so repeated requests for the same block will not result in repeated
-        downloads (unless the block is evicted from the cache.)  This method
-        does not block.
-
         """
 
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
-        self.start_get_threads()
-        self._prefetch_queue.put(locator)
+        self._keep.block_prefetch(locator)
 
 
 class ArvadosFile(object):
@@ -863,7 +825,7 @@ class ArvadosFile(object):
     """
 
     __slots__ = ('parent', 'name', '_writers', '_committed',
-                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 
     def __init__(self, parent, name, stream=[], segments=[]):
         """
@@ -884,6 +846,7 @@ class ArvadosFile(object):
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
+        self._read_counter = 0
 
     def writable(self):
         return self.parent.writable()
@@ -1098,7 +1061,11 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = None
+            if self.parent._my_block_manager()._keep.num_prefetch_threads > 0 and (self._read_counter % 128) == 0:
+                prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads,
+                                               limit=(1+self.parent._my_block_manager()._keep.num_prefetch_threads))
+            self._read_counter += 1
 
         locs = set()
         data = []
@@ -1106,17 +1073,21 @@ class ArvadosFile(object):
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
-                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
                 locs.add(lr.locator)
             else:
                 break
 
-        for lr in prefetch:
-            if lr.locator not in locs:
-                self.parent._my_block_manager().block_prefetch(lr.locator)
-                locs.add(lr.locator)
+        if prefetch:
+            for lr in prefetch:
+                if lr.locator not in locs:
+                    self.parent._my_block_manager().block_prefetch(lr.locator)
+                    locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized