DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None):
+ def __init__(self, keep, copies=None, put_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ if put_threads:
+ self.num_put_threads = put_threads
+ else:
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
return
self._keep.get(b)
except Exception:
- pass
+ _logger.exception("Exception doing block prefetch")
@synchronized
def start_get_threads(self):
# Update the pending write size count with its true value, just in case
# some small file was opened, written and closed several times.
- if not force:
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE:
- return
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
def writable(self):
return self.parent.writable()
+ @synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()