DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None):
+ def __init__(self, keep, copies=None, put_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ if put_threads:
+ self.num_put_threads = put_threads
+ else:
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
return
self._keep.get(b)
except Exception:
- pass
+ _logger.exception("Exception doing block prefetch")
@synchronized
def start_get_threads(self):
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()