X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/31568ea331306a574be758fc60b090ecee3bc005..ef6f7202858cba65e06cc1a32d52ee2305687bc8:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 5bac10ec11..c394dab810 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -432,6 +432,9 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) @@ -566,8 +569,7 @@ class _BlockManager(object): # Check if there are enough small blocks for filling up one in full pending_write_size = sum([b.size() for b in small_blocks]) if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): - new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None) - self._bufferblocks[new_bb.blockid] = new_bb + new_bb = self._alloc_bufferblock() while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) arvfile = bb.owner @@ -576,8 +578,7 @@ class _BlockManager(object): 0, bb.size(), new_bb.write_pointer - bb.size())]) - bb.clear() - del self._bufferblocks[bb.blockid] + self._delete_bufferblock(bb.blockid) self.commit_bufferblock(new_bb, sync=sync) def commit_bufferblock(self, block, sync): @@ -629,6 +630,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -830,14 +834,14 @@ class ArvadosFile(object): self._writers.add(writer) @synchronized - def remove_writer(self, writer): + def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) - if self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): @@ -1162,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.arvadosfile.remove_writer(self) + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()