X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e502060ffe4f68d33e2cca8f8d7544ce40d53eb7..c39ba5193005a4e9f619901f8348f11fada88df0:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index edeb910570..aad3ce12a5 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -38,6 +38,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -90,11 +96,23 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + if pos < 0L: + raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") + self._filepos = pos + return self._filepos def tell(self): return self._filepos + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): @@ -166,13 +184,13 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): return ''.join(data).splitlines(True) def size(self): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def read(self, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def readfrom(self, start, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") class StreamFileReader(ArvadosFileReaderBase): @@ -422,6 +440,7 @@ class _BlockManager(object): self.copies = copies self._pending_write_size = 0 self.threads_lock = threading.Lock() + self.padding_block = None @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -571,7 +590,11 @@ class _BlockManager(object): # A WRITABLE block always has an owner. # A WRITABLE block with its owner.closed() implies that it's # size is <= KEEP_BLOCK_SIZE/2. - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + try: + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() if len(small_blocks) <= 1: # Not enough small blocks for repacking @@ -643,6 +666,22 @@ class _BlockManager(object): def get_bufferblock(self, locator): return self._bufferblocks.get(locator) + @synchronized + def get_padding_block(self): + """Get a bufferblock 64 MB in size consisting of all zeros, used as padding + when using truncate() to extend the size of a file. + + For reference (and possible future optimization), the md5sum of the + padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 + + """ + + if self.padding_block is None: + self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) + self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE + self.commit_bufferblock(self.padding_block, False) + return self.padding_block + @synchronized def delete_bufferblock(self, locator): self._delete_bufferblock(locator) @@ -801,7 +840,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._committed = False + self.set_committed(False) def __eq__(self, other): if other is self: @@ -841,9 +880,18 @@ class ArvadosFile(object): self._segments = segs @synchronized - def set_committed(self): - """Set committed flag to True""" - self._committed = True + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized def committed(self): @@ -881,11 +929,11 @@ class ArvadosFile(object): @must_be_writable @synchronized def truncate(self, size): - """Shrink the size of the file. + """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size - of the file, an IOError will be raised. + of the file, it will be filled with zero bytes. """ if size < self.size(): @@ -904,9 +952,19 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._committed = False + self.set_committed(False) elif size > self.size(): - raise IOError(errno.EINVAL, "truncate() does not support extending the file size") + padding = self.parent._my_block_manager().get_padding_block() + diff = size - self.size() + while diff > config.KEEP_BLOCK_SIZE: + self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) + diff -= config.KEEP_BLOCK_SIZE + if diff > 0: + self._segments.append(Range(padding.blockid, self.size(), diff, 0)) + self.set_committed(False) + else: + # size == self.size() + pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. @@ -942,23 +1000,28 @@ class ArvadosFile(object): return ''.join(data) def _repack_writes(self, num_retries): - """Test if the buffer block has more data than actual segments. + """Optimize buffer block by repacking segments in file sequence. - This happens when a buffered write over-writes a file range written in - a previous buffered write. Re-pack the buffer block for efficiency - and to avoid leaking information. + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. """ segs = self._segments - # Sum up the segments to get the total bytes of the file referencing - # into the buffer block. + # Collect the segments that reference the buffer block. bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). write_total = sum([s.range_size for s in bufferblock_segs]) - if write_total < self._current_bblock.size(): - # There is more data in the buffer block than is actually accounted for by segments, so - # re-pack into a new buffer by copying over to a new buffer block. + if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) for t in bufferblock_segs: @@ -980,7 +1043,7 @@ class ArvadosFile(object): return if offset > self.size(): - raise ArgumentError("Offset is past the end of the file") + self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes @@ -991,7 +1054,7 @@ class ArvadosFile(object): n += config.KEEP_BLOCK_SIZE return - self._committed = False + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1054,7 +1117,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._committed = False + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -1091,7 +1154,7 @@ class ArvadosFile(object): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent @@ -1164,6 +1227,9 @@ class ArvadosFileWriter(ArvadosFileReader): self.mode = mode self.arvadosfile.add_writer(self) + def writable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): @@ -1185,8 +1251,6 @@ class ArvadosFileWriter(ArvadosFileReader): if size is None: size = self._filepos self.arvadosfile.truncate(size) - if self._filepos > self.size(): - self._filepos = self.size() @_FileLikeObjectBase._before_close def flush(self):