DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None):
+ def __init__(self, keep, copies=None, put_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ if put_threads:
+ self.num_put_threads = put_threads
+ else:
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
def writable(self):
return self.parent.writable()
+ @synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()