"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 4
+ DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
b = self._prefetch_queue.get()
if b is None:
return
- if self._keep.has_cache_slot(b):
- continue
- _logger.debug("prefetching %s", b)
- self._keep.get(b)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
- if self._keep.has_cache_slot(locator):
- return
-
with self.lock:
if locator in self._bufferblocks:
return
self.start_get_threads()
- # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
locs = set()
data = []
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- if len(data) == 1:
- return data[0]
- else:
- return b''.join(data)
+ return b''.join(data)
@must_be_writable
@synchronized