X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/04951581a941697d68cdaf9af6661c3c412f1bce..8c5f2973a5c5f042d1d12aef1c470b37519fd416:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index d3377854aa..a2ec76a076 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -16,7 +16,7 @@ import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream -from ._ranges import locators_and_ranges, replace_range, Range +from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange from .retry import retry_method MOD = "mod" @@ -99,10 +99,20 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): if pos < 0L: raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") self._filepos = pos + return self._filepos def tell(self): return self._filepos + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): @@ -174,13 +184,13 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): return ''.join(data).splitlines(True) def size(self): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def read(self, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def readfrom(self, start, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") class StreamFileReader(ArvadosFileReaderBase): @@ -279,6 +289,7 @@ class _BufferBlock(object): PENDING = 1 COMMITTED = 2 ERROR = 3 + DELETED = 4 def __init__(self, blockid, starting_capacity, owner): """ @@ -373,10 +384,54 @@ class _BufferBlock(object): @synchronized def clear(self): + self._state = _BufferBlock.DELETED self.owner = None self.buffer_block = None self.buffer_view = None + @synchronized + def repack_writes(self): + """Optimize buffer block by repacking segments in file sequence. + + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. + + """ + if self._state != _BufferBlock.WRITABLE: + raise AssertionError("Cannot repack non-writable block") + + segs = self.owner.segments() + + # Collect the segments that reference the buffer block. + bufferblock_segs = [s for s in segs if s.locator == self.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). + write_total = sum([s.range_size for s in bufferblock_segs]) + + if write_total < self.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. + contents = self.buffer_view[0:self.write_pointer].tobytes() + new_bb = _BufferBlock(None, write_total, None) + for t in bufferblock_segs: + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) + t.segment_offset = new_bb.size() - t.range_size + + self.buffer_block = new_bb.buffer_block + self.buffer_view = new_bb.buffer_view + self.write_pointer = new_bb.write_pointer + self._locator = None + new_bb.clear() + self.owner.set_segments(segs) + + def __repr__(self): + return "" % (self.blockid) + class NoopLock(object): def __enter__(self): @@ -450,7 +505,7 @@ class _BlockManager(object): def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "%s" % uuid.uuid4() + blockid = str(uuid.uuid4()) bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -466,7 +521,7 @@ class _BlockManager(object): ArvadosFile that owns the new block """ - new_blockid = "bufferblock%i" % len(self._bufferblocks) + new_blockid = str(uuid.uuid4()) bufferblock = block.clone(new_blockid, owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -571,43 +626,56 @@ class _BlockManager(object): @synchronized def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): """Packs small blocks together before uploading""" + self._pending_write_size += closed_file_size # Check if there are enough small blocks for filling up one in full - if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): + return - # Search blocks ready for getting packed together before being committed to Keep. - # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's - # size is <= KEEP_BLOCK_SIZE/2. - try: - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] - except AttributeError: - # Writable blocks without owner shouldn't exist. - raise UnownedBlockError() + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + try: + small_blocks = [b for b in self._bufferblocks.values() + if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + for bb in small_blocks: + bb.repack_writes() - # Update the pending write size count with its true value, just in case - # some small file was opened, written and closed several times. - self._pending_write_size = sum([b.size() for b in small_blocks]) - if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: - return + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return - new_bb = self._alloc_bufferblock() - while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: - bb = small_blocks.pop(0) - arvfile = bb.owner - self._pending_write_size -= bb.size() - new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) - arvfile.set_segments([Range(new_bb.blockid, - 0, - bb.size(), - new_bb.write_pointer - bb.size())]) - self._delete_bufferblock(bb.blockid) - self.commit_bufferblock(new_bb, sync=sync) + new_bb = self._alloc_bufferblock() + files = [] + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + files.append((bb, new_bb.write_pointer - bb.size())) + + self.commit_bufferblock(new_bb, sync=sync) + + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.locator() + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -659,7 +727,12 @@ class _BlockManager(object): @synchronized def get_padding_block(self): """Get a bufferblock 64 MB in size consisting of all zeros, used as padding - when using truncate() to extend the size of a file.""" + when using truncate() to extend the size of a file. + + For reference (and possible future optimization), the md5sum of the + padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 + + """ if self.padding_block is None: self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) @@ -984,32 +1057,6 @@ class ArvadosFile(object): return ''.join(data) - def _repack_writes(self, num_retries): - """Test if the buffer block has more data than actual segments. - - This happens when a buffered write over-writes a file range written in - a previous buffered write. Re-pack the buffer block for efficiency - and to avoid leaking information. - - """ - segs = self._segments - - # Sum up the segments to get the total bytes of the file referencing - # into the buffer block. - bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] - write_total = sum([s.range_size for s in bufferblock_segs]) - - if write_total < self._current_bblock.size(): - # There is more data in the buffer block than is actually accounted for by segments, so - # re-pack into a new buffer by copying over to a new buffer block. - contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) - new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) - for t in bufferblock_segs: - new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) - t.segment_offset = new_bb.size() - t.range_size - - self._current_bblock = new_bb - @must_be_writable @synchronized def writeto(self, offset, data, num_retries): @@ -1022,12 +1069,8 @@ class ArvadosFile(object): if len(data) == 0: return - print "Writing", len(data), "bytes to offset", offset, "current size is", self.size() - if offset > self.size(): - print "Need to extend to", offset self.truncate(offset) - print "Size is now", self.size() if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes @@ -1044,7 +1087,7 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes(num_retries) + self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1071,8 +1114,9 @@ class ArvadosFile(object): if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: - self._repack_writes(num_retries) - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) + self._current_bblock.repack_writes() + if self._current_bblock.state() != _BufferBlock.DELETED: + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() @@ -1121,17 +1165,17 @@ class ArvadosFile(object): normalize=False, only_committed=False): buf = "" filestream = [] - for segment in self.segments: + for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue - loc = self._bufferblocks[loc].calculate_locator() + loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() - filestream.append(LocatorAndRange(loc, locator_block_size(loc), + filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) - buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) + buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @@ -1211,6 +1255,9 @@ class ArvadosFileWriter(ArvadosFileReader): self.mode = mode self.arvadosfile.add_writer(self) + def writable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): @@ -1232,8 +1279,6 @@ class ArvadosFileWriter(ArvadosFileReader): if size is None: size = self._filepos self.arvadosfile.truncate(size) - if self._filepos > self.size(): - self._filepos = self.size() @_FileLikeObjectBase._before_close def flush(self):