ArvadosFile that owns this block
"""
+ return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+ def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
blockid = "%s" % uuid.uuid4()
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
# Check if there are enough small blocks for filling up one in full
pending_write_size = sum([b.size() for b in small_blocks])
if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
- new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None)
- self._bufferblocks[new_bb.blockid] = new_bb
+ new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
arvfile = bb.owner
0,
bb.size(),
new_bb.write_pointer - bb.size())])
- bb.clear()
- del self._bufferblocks[bb.blockid]
+ self._delete_bufferblock(bb.blockid)
self.commit_bufferblock(new_bb, sync=sync)
def commit_bufferblock(self, block, sync):
@synchronized
def delete_bufferblock(self, locator):
+ self._delete_bufferblock(locator)
+
+ def _delete_bufferblock(self, locator):
bb = self._bufferblocks[locator]
bb.clear()
del self._bufferblocks[locator]
self._writers.add(writer)
@synchronized
- def remove_writer(self, writer):
+ def remove_writer(self, writer, flush):
"""
Called from ArvadosFileWriter.close(). Remove a writer reference from the list
and do some block maintenance tasks.
"""
self._writers.remove(writer)
- if self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
def flush(self):
self.arvadosfile.flush()
- def close(self):
+ def close(self, flush=True):
if not self.closed:
- self.arvadosfile.remove_writer(self)
+ self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()