self._put_queue = None
self._put_threads = None
self.lock = threading.Lock()
- self.prefetch_enabled = True
+ self.prefetch_lookahead = self._keep.num_prefetch_threads
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
"""Initiate a background download of a block.
"""
- if not self.prefetch_enabled:
+ if not self.prefetch_lookahead:
return
with self.lock:
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
+
prefetch = None
- if self.parent._my_block_manager()._keep.num_prefetch_threads > 0 and (self._read_counter % 128) == 0:
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads,
- limit=(1+self.parent._my_block_manager()._keep.num_prefetch_threads))
- self._read_counter += 1
+ prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
+ if prefetch_lookahead:
+ # Doing prefetch on every read() call is surprisingly expensive
+ # when we're trying to deliver data at 600+ MiBps and want
+ # the read() fast path to be as lightweight as possible.
+ #
+ # Only prefetching every 128 read operations
+ # dramatically reduces the overhead while still
+ # getting the benefit of prefetching (e.g. when
+ # reading 128 KiB at a time, it checks for prefetch
+ # every 16 MiB).
+ self._read_counter = (self._read_counter+1) % 128
+ if self._read_counter == 1:
+ prefetch = locators_and_ranges(self._segments,
+ offset + size,
+ config.KEEP_BLOCK_SIZE * prefetch_lookahead,
+ limit=(1+prefetch_lookahead))
locs = set()
data = []