"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
+ DEFAULT_GET_THREADS = 4
def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
"""keep: KeepClient object to use"""
b = self._prefetch_queue.get()
if b is None:
return
+ if self._keep.has_cache_slot(b):
+ continue
+ _logger.debug("prefetching %s", b)
self._keep.get(b)
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
+ if self._keep.has_cache_slot(locator):
return
with self.lock:
return
self.start_get_threads()
+ # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
locs = set()
data = []
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized