X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6cd3b6fb1c143017c6bca4db899c8c6ddd237572..1cd7fd3867acabeb29196da4cf505a0eb703b287:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index c82030923b..95dcea0ad3 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -2,17 +2,19 @@ import functools import os import zlib import bz2 -from ._ranges import locators_and_ranges, replace_range, Range -from arvados.retry import retry_method import config import hashlib import threading import Queue import copy import errno -from .errors import KeepWriteError, AssertionError +import re + +from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator -from _normalize_stream import normalize_stream +from ._normalize_stream import normalize_stream +from ._ranges import locators_and_ranges, replace_range, Range +from .retry import retry_method def split(path): """split(path) -> streamname, filename @@ -75,7 +77,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): return re.sub('\.(bz2|gz)$', '', self.name) @_FileLikeObjectBase._before_close - def seek(self, pos, whence=os.SEEK_CUR): + def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: @@ -303,7 +305,7 @@ class _BufferBlock(object): self.buffer_view = None self.buffer_block = None else: - raise AssertionError("Invalid state change from %s to %s" % (self.state, state)) + raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate)) @synchronized def state(self): @@ -482,9 +484,10 @@ class _BlockManager(object): thread.daemon = True thread.start() - # Mark the block as PENDING so to disallow any more appends. - block.set_state(_BufferBlock.PENDING) - self._put_queue.put(block) + if block.state() == _BufferBlock.WRITABLE: + # Mark the block as PENDING so to disallow any more appends. + block.set_state(_BufferBlock.PENDING) + self._put_queue.put(block) @synchronized def get_bufferblock(self, locator): @@ -521,8 +524,7 @@ class _BlockManager(object): items = self._bufferblocks.items() for k,v in items: - if v.state() == _BufferBlock.WRITABLE: - self.commit_bufferblock(v) + v.owner.flush() with self.lock: if self._put_queue is not None: @@ -701,7 +703,7 @@ class ArvadosFile(object): # segment is past the trucate size, all done break elif size < range_end: - nr = Range(r.locator, r.range_start, size - r.range_start) + nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break @@ -713,8 +715,15 @@ class ArvadosFile(object): elif size > self.size(): raise IOError("truncate() does not support extending the file size") - def readfrom(self, offset, size, num_retries): - """Read upto `size` bytes from the file starting at `offset`.""" + + def readfrom(self, offset, size, num_retries, exact=False): + """Read up to `size` bytes from the file starting at `offset`. + + :exact: + If False (default), return less data than requested if the read + crosses a block boundary and the next block isn't cached. If True, + only return less data than requested when hitting EOF. + """ with self.lock: if size == 0 or offset >= self.size(): @@ -727,14 +736,14 @@ class ArvadosFile(object): data = [] for lr in readsegs: - block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data)) + block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size]) else: break return ''.join(data) - def _repack_writes(self): + def _repack_writes(self, num_retries): """Test if the buffer block has more data than actual segments. This happens when a buffered write over-writes a file range written in @@ -752,9 +761,10 @@ class ArvadosFile(object): if write_total < self._current_bblock.size(): # There is more data in the buffer block than is actually accounted for by segments, so # re-pack into a new buffer by copying over to a new buffer block. + contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) for t in bufferblock_segs: - new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes()) + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) t.segment_offset = new_bb.size() - t.range_size self._current_bblock = new_bb @@ -775,7 +785,13 @@ class ArvadosFile(object): raise ArgumentError("Offset is past the end of the file") if len(data) > config.KEEP_BLOCK_SIZE: - raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE)) + # Chunk it up into smaller writes + n = 0 + dataview = memoryview(data) + while n < len(data): + self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) + n += config.KEEP_BLOCK_SIZE + return self._modified = True @@ -783,7 +799,7 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes() + self._repack_writes(num_retries) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -793,9 +809,9 @@ class ArvadosFile(object): replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) @synchronized - def flush(self): + def flush(self, num_retries=0): if self._current_bblock: - self._repack_writes() + self._repack_writes(num_retries) self.parent._my_block_manager().commit_bufferblock(self._current_bblock) @must_be_writable @@ -813,7 +829,7 @@ class ArvadosFile(object): """Internal implementation of add_segment.""" self._modified = True for lr in locators_and_ranges(blocks, pos, size): - last = self._segments[-1] if self._segments else Range(0, 0, 0) + last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @@ -865,7 +881,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): @retry_method def read(self, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the current file position.""" - data = self.arvadosfile.readfrom(self._filepos, size, num_retries) + data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) return data