stream_name, file_name = '.', path
return stream_name, file_name
+
+class UnownedBlockError(Exception):
+ """Raised when there's an writable block without an owner on the BlockManager."""
+ pass
+
+
class _FileLikeObjectBase(object):
def __init__(self, name, mode):
self.name = name
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self.size())
+ if pos < 0L:
+ raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
+ self._filepos = pos
+ return self._filepos
def tell(self):
return self._filepos
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return True
+
@_FileLikeObjectBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
return ''.join(data).splitlines(True)
def size(self):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
self.copies = copies
self._pending_write_size = 0
self.threads_lock = threading.Lock()
+ self.padding_block = None
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
# A WRITABLE block always has an owner.
# A WRITABLE block with its owner.closed() implies that it's
# size is <= KEEP_BLOCK_SIZE/2.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ try:
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
if len(small_blocks) <= 1:
# Not enough small blocks for repacking
def get_bufferblock(self, locator):
return self._bufferblocks.get(locator)
+ @synchronized
+ def get_padding_block(self):
+ """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
+ when using truncate() to extend the size of a file.
+
+ For reference (and possible future optimization), the md5sum of the
+ padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
+
+ """
+
+ if self.padding_block is None:
+ self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
+ self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
+ self.commit_bufferblock(self.padding_block, False)
+ return self.padding_block
+
@synchronized
def delete_bufferblock(self, locator):
self._delete_bufferblock(locator)
self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
- self._committed = False
+ self.set_committed(False)
def __eq__(self, other):
if other is self:
self._segments = segs
@synchronized
- def set_committed(self):
- """Set committed flag to True"""
- self._committed = True
+ def set_committed(self, value=True):
+ """Set committed flag.
+
+ If value is True, set committed to be True.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ self._committed = value
+ if self._committed is False and self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def committed(self):
@must_be_writable
@synchronized
def truncate(self, size):
- """Shrink the size of the file.
+ """Shrink or expand the size of the file.
If `size` is less than the size of the file, the file contents after
`size` will be discarded. If `size` is greater than the current size
- of the file, an IOError will be raised.
+ of the file, it will be filled with zero bytes.
"""
if size < self.size():
new_segs.append(r)
self._segments = new_segs
- self._committed = False
+ self.set_committed(False)
elif size > self.size():
- raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
+ padding = self.parent._my_block_manager().get_padding_block()
+ diff = size - self.size()
+ while diff > config.KEEP_BLOCK_SIZE:
+ self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
+ diff -= config.KEEP_BLOCK_SIZE
+ if diff > 0:
+ self._segments.append(Range(padding.blockid, self.size(), diff, 0))
+ self.set_committed(False)
+ else:
+ # size == self.size()
+ pass
def readfrom(self, offset, size, num_retries, exact=False):
"""Read up to `size` bytes from the file starting at `offset`.
return ''.join(data)
def _repack_writes(self, num_retries):
- """Test if the buffer block has more data than actual segments.
+ """Optimize buffer block by repacking segments in file sequence.
- This happens when a buffered write over-writes a file range written in
- a previous buffered write. Re-pack the buffer block for efficiency
- and to avoid leaking information.
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
"""
segs = self._segments
- # Sum up the segments to get the total bytes of the file referencing
- # into the buffer block.
+ # Collect the segments that reference the buffer block.
bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
write_total = sum([s.range_size for s in bufferblock_segs])
- if write_total < self._current_bblock.size():
- # There is more data in the buffer block than is actually accounted for by segments, so
- # re-pack into a new buffer by copying over to a new buffer block.
+ if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
return
if offset > self.size():
- raise ArgumentError("Offset is past the end of the file")
+ self.truncate(offset)
if len(data) > config.KEEP_BLOCK_SIZE:
# Chunk it up into smaller writes
n += config.KEEP_BLOCK_SIZE
return
- self._committed = False
+ self.set_committed(False)
if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
def _add_segment(self, blocks, pos, size):
"""Internal implementation of add_segment."""
- self._committed = False
+ self.set_committed(False)
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush(sync=True)
self.parent.remove(self.name)
self.parent = newparent
self.mode = mode
self.arvadosfile.add_writer(self)
+ def writable(self):
+ return True
+
@_FileLikeObjectBase._before_close
@retry_method
def write(self, data, num_retries=None):
if size is None:
size = self._filepos
self.arvadosfile.truncate(size)
- if self._filepos > self.size():
- self._filepos = self.size()
@_FileLikeObjectBase._before_close
def flush(self):