X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9..63b03a39adfd78961c5bbb6a3a2d02ccd8c92e4d:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 0bc70a7c6e..f9f91cf51a 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -2,22 +2,24 @@ import functools import os import zlib import bz2 -from .ranges import * +from ._ranges import locators_and_ranges, replace_range, Range from arvados.retry import retry_method import config import hashlib -import hashlib import threading import Queue import copy import errno from .errors import KeepWriteError, AssertionError +from .keep import KeepLocator +from _normalize_stream import normalize_stream def split(path): - """Separate the stream name and file name in a /-separated stream path and - return a tuple (stream_name, file_name). + """split(path) -> streamname, filename - If no stream name is available, assume '.'. + Separate the stream name and file name in a /-separated stream path and + return a tuple (stream_name, file_name). If no stream name is available, + assume '.'. """ try: @@ -26,7 +28,7 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name -class FileLikeObjectBase(object): +class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name self.mode = mode @@ -55,16 +57,9 @@ class FileLikeObjectBase(object): self.closed = True -class ArvadosFileReaderBase(FileLikeObjectBase): - class _NameAttribute(str): - # The Python file API provides a plain .name attribute. - # Older SDK provided a name() method. - # This class provides both, for maximum compatibility. - def __call__(self): - return self - +class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): - super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode) + super(ArvadosFileReaderBase, self).__init__(name, mode) self._filepos = 0L self.num_retries = num_retries self._readline_cache = (None, None) @@ -79,7 +74,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_CUR): if whence == os.SEEK_CUR: pos += self._filepos @@ -90,7 +85,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): def tell(self): return self._filepos - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: @@ -99,7 +94,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): break yield data - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readline(self, size=float('inf'), num_retries=None): cache_pos, cache_data = self._readline_cache @@ -123,7 +118,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): for segment in self.readall(size, num_retries): @@ -131,7 +126,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): if data: yield data - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) @@ -146,7 +141,7 @@ class ArvadosFileReaderBase(FileLikeObjectBase): else: return self.readall(size, num_retries=num_retries) - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readlines(self, sizehint=float('inf'), num_retries=None): data = [] @@ -169,8 +164,15 @@ class ArvadosFileReaderBase(FileLikeObjectBase): class StreamFileReader(ArvadosFileReaderBase): + class _NameAttribute(str): + # The Python file API provides a plain .name attribute. + # Older SDK provided a name() method. + # This class provides both, for maximum compatibility. + def __call__(self): + return self + def __init__(self, stream, segments, name): - super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries) + super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) self._stream = stream self.segments = segments @@ -181,7 +183,7 @@ class StreamFileReader(ArvadosFileReaderBase): n = self.segments[-1] return n.range_start + n.range_size - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" @@ -192,14 +194,14 @@ class StreamFileReader(ArvadosFileReaderBase): available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] - data = self._stream._readfrom(lr.locator+lr.segment_offset, + data = self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries) self._filepos += len(data) return data - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" @@ -208,21 +210,26 @@ class StreamFileReader(ArvadosFileReaderBase): data = [] for lr in locators_and_ranges(self.segments, start, size): - data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size, + data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) return ''.join(data) def as_manifest(self): - from stream import normalize_stream segs = [] for r in self.segments: segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) return " ".join(normalize_stream(".", {self.name: segs})) + "\n" -class BufferBlock(object): - """A BufferBlock is a stand-in for a Keep block that is in the process of being - written. +def synchronized(orig_func): + @functools.wraps(orig_func) + def synchronized_wrapper(self, *args, **kwargs): + with self.lock: + return orig_func(self, *args, **kwargs) + return synchronized_wrapper + +class _BufferBlock(object): + """A stand-in for a Keep block that is in the process of being written. Writers can append to it, get the size, and compute the Keep locator. There are three valid states: @@ -261,10 +268,12 @@ class BufferBlock(object): self.buffer_block = bytearray(starting_capacity) self.buffer_view = memoryview(self.buffer_block) self.write_pointer = 0 - self.state = BufferBlock.WRITABLE + self._state = _BufferBlock.WRITABLE self._locator = None self.owner = owner + self.lock = threading.Lock() + @synchronized def append(self, data): """Append some data to the buffer. @@ -272,7 +281,7 @@ class BufferBlock(object): buffer, doubling capacity as needed to accomdate all the data. """ - if self.state == BufferBlock.WRITABLE: + if self._state == _BufferBlock.WRITABLE: while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] @@ -284,23 +293,41 @@ class BufferBlock(object): else: raise AssertionError("Buffer block is not writable") + @synchronized + def set_state(self, nextstate, loc=None): + if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or + (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)): + self._state = nextstate + if self._state == _BufferBlock.COMMITTED: + self._locator = loc + self.buffer_view = None + self.buffer_block = None + else: + raise AssertionError("Invalid state change from %s to %s" % (self.state, state)) + + @synchronized + def state(self): + return self._state + def size(self): """The amount of data written to the buffer.""" return self.write_pointer + @synchronized def locator(self): """The Keep locator for this buffer's contents.""" if self._locator is None: self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) return self._locator + @synchronized + def clone(self, new_blockid, owner): + if self._state == _BufferBlock.COMMITTED: + raise AssertionError("Can only duplicate a writable or pending buffer block") + bufferblock = _BufferBlock(new_blockid, self.size(), owner) + bufferblock.append(self.buffer_view[0:self.size()]) + return bufferblock -def synchronized(orig_func): - @functools.wraps(orig_func) - def synchronized_wrapper(self, *args, **kwargs): - with self.lock: - return orig_func(self, *args, **kwargs) - return synchronized_wrapper class NoopLock(object): def __enter__(self): @@ -315,22 +342,20 @@ class NoopLock(object): def release(self): pass -SYNC_READONLY = 1 -SYNC_EXPLICIT = 2 -SYNC_LIVE = 3 - def must_be_writable(orig_func): @functools.wraps(orig_func) def must_be_writable_wrapper(self, *args, **kwargs): - if self.sync_mode() == SYNC_READONLY: - raise IOError((errno.EROFS, "Collection is read only")) + if not self.writable(): + raise IOError((errno.EROFS, "Collection must be writable.")) return orig_func(self, *args, **kwargs) return must_be_writable_wrapper -class BlockManager(object): - """BlockManager handles buffer blocks, background block uploads, and background - block prefetch for a Collection of ArvadosFiles. +class _BlockManager(object): + """BlockManager handles buffer blocks. + + Also handles background block uploads, and background block prefetch for a + Collection of ArvadosFiles. """ def __init__(self, keep): @@ -363,29 +388,23 @@ class BlockManager(object): """ if blockid is None: blockid = "bufferblock%i" % len(self._bufferblocks) - bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) + bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @synchronized - def dup_block(self, blockid, owner): - """Create a new bufferblock in WRITABLE state, initialized with the content of - an existing bufferblock. + def dup_block(self, block, owner): + """Create a new bufferblock initialized with the content of an existing bufferblock. - :blockid: - the block to copy. May be an existing buffer block id. + :block: + the buffer block to copy. :owner: ArvadosFile that owns the new block """ new_blockid = "bufferblock%i" % len(self._bufferblocks) - block = self._bufferblocks[blockid] - if block.state != BufferBlock.WRITABLE: - raise AssertionError("Can only duplicate a writable buffer block") - - bufferblock = BufferBlock(new_blockid, block.size(), owner) - bufferblock.append(block.buffer_view[0:block.size()]) + bufferblock = block.clone(new_blockid, owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -430,12 +449,10 @@ class BlockManager(object): bufferblock = self._put_queue.get() if bufferblock is None: return - bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) - bufferblock.state = BufferBlock.COMMITTED - bufferblock.buffer_view = None - bufferblock.buffer_block = None + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + bufferblock.set_state(_BufferBlock.COMMITTED, loc) + except Exception as e: - print e self._put_errors.put((bufferblock.locator(), e)) finally: if self._put_queue is not None: @@ -465,12 +482,12 @@ class BlockManager(object): thread.start() # Mark the block as PENDING so to disallow any more appends. - block.state = BufferBlock.PENDING + block.set_state(_BufferBlock.PENDING) self._put_queue.put(block) + @synchronized def get_bufferblock(self, locator): - with self.lock: - return self._bufferblocks.get(locator) + return self._bufferblocks.get(locator) def get_block_contents(self, locator, num_retries, cache_only=False): """Fetch a block. @@ -482,7 +499,7 @@ class BlockManager(object): with self.lock: if locator in self._bufferblocks: bufferblock = self._bufferblocks[locator] - if bufferblock.state != BufferBlock.COMMITTED: + if bufferblock.state() != _BufferBlock.COMMITTED: return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() else: locator = bufferblock._locator @@ -503,12 +520,13 @@ class BlockManager(object): items = self._bufferblocks.items() for k,v in items: - if v.state == BufferBlock.WRITABLE: + if v.state() == _BufferBlock.WRITABLE: self.commit_bufferblock(v) with self.lock: if self._put_queue is not None: self._put_queue.join() + if not self._put_errors.empty(): err = [] try: @@ -516,7 +534,7 @@ class BlockManager(object): err.append(self._put_errors.get(False)) except Queue.Empty: pass - raise KeepWriteError("Error writing some blocks", err) + raise KeepWriteError("Error writing some blocks", err, label="block") def block_prefetch(self, locator): """Initiate a background download of a block. @@ -539,7 +557,7 @@ class BlockManager(object): if b is None: return self._keep.get(b) - except: + except Exception: pass with self.lock: @@ -557,7 +575,9 @@ class BlockManager(object): class ArvadosFile(object): - """ArvadosFile manages the underlying representation of a file in Keep as a + """Represent a file in a Collection. + + ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. @@ -583,8 +603,8 @@ class ArvadosFile(object): self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None - def sync_mode(self): - return self.parent.sync_mode() + def writable(self): + return self.parent.writable() @synchronized def segments(self): @@ -594,28 +614,29 @@ class ArvadosFile(object): def clone(self, new_parent): """Make a copy of this file.""" cp = ArvadosFile(new_parent) - - map_loc = {} - for r in self._segments: - new_loc = r.locator - if self.parent._my_block_manager().is_bufferblock(r.locator): - if r.locator not in map_loc: - bufferblock = get_bufferblock(r.locator) - if bufferblock.state == BufferBlock.COMITTED: - map_loc[r.locator] = bufferblock.locator() - else: - map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp) - new_loc = map_loc[r.locator] - - cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset)) - + cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" - self._segments = other.segments() + + map_loc = {} + self._segments = [] + for other_segment in other.segments(): + new_loc = other_segment.locator + if other.parent._my_block_manager().is_bufferblock(other_segment.locator): + if other_segment.locator not in map_loc: + bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) + if bufferblock.state() != _BufferBlock.WRITABLE: + map_loc[other_segment.locator] = bufferblock.locator() + else: + map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid + new_loc = map_loc[other_segment.locator] + + self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) + self._modified = True def __eq__(self, other): @@ -624,9 +645,29 @@ class ArvadosFile(object): if not isinstance(other, ArvadosFile): return False - s = other.segments() + othersegs = other.segments() with self.lock: - return self._segments == s + if len(self._segments) != len(othersegs): + return False + for i in xrange(0, len(othersegs)): + seg1 = self._segments[i] + seg2 = othersegs[i] + loc1 = seg1.locator + loc2 = seg2.locator + + if self.parent._my_block_manager().is_bufferblock(loc1): + loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() + + if other.parent._my_block_manager().is_bufferblock(loc2): + loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() + + if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or + seg1.range_start != seg2.range_start or + seg1.range_size != seg2.range_size or + seg1.segment_offset != seg2.segment_offset): + return False + + return True def __ne__(self, other): return not self.__eq__(other) @@ -693,8 +734,7 @@ class ArvadosFile(object): return ''.join(data) def _repack_writes(self): - """Test if the buffer block has more data than is referenced by actual - segments. + """Test if the buffer block has more data than actual segments. This happens when a buffered write over-writes a file range written in a previous buffered write. Re-pack the buffer block for efficiency @@ -738,7 +778,7 @@ class ArvadosFile(object): self._modified = True - if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE: + if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: @@ -751,11 +791,20 @@ class ArvadosFile(object): replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + @synchronized + def flush(self): + if self._current_bblock: + self._repack_writes() + self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + + @must_be_writable @synchronized def add_segment(self, blocks, pos, size): - """Add a segment to the end of the file, with `pos` and `offset` referencing a - section of the stream described by `blocks` (a list of Range objects) + """Add a segment to the end of the file. + + `pos` and `offset` reference a section of the stream described by + `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) @@ -777,6 +826,25 @@ class ArvadosFile(object): else: return 0 + + @synchronized + def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + buf = "" + item = self + filestream = [] + for segment in item.segments: + loc = segment.locator + if loc.startswith("bufferblock"): + loc = item._bufferblocks[loc].calculate_locator() + if portable_locators: + loc = KeepLocator(loc).stripped() + filestream.append(LocatorAndRange(loc, locator_block_size(loc), + segment.segment_offset, segment.range_size)) + buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) + buf += "\n" + return buf + + class ArvadosFileReader(ArvadosFileReaderBase): """Wraps ArvadosFile in a file-like object supporting reading only. @@ -792,7 +860,10 @@ class ArvadosFileReader(ArvadosFileReaderBase): def size(self): return self.arvadosfile.size() - @FileLikeObjectBase._before_close + def stream_name(self): + return self.arvadosfile.parent.stream_name() + + @_FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the current file position.""" @@ -800,7 +871,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): self._filepos += len(data) return data - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the current file position.""" @@ -821,7 +892,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, name, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": @@ -830,12 +901,13 @@ class ArvadosFileWriter(ArvadosFileReader): self.arvadosfile.writeto(self._filepos, data, num_retries) self._filepos += len(data) - @FileLikeObjectBase._before_close + @_FileLikeObjectBase._before_close @retry_method def writelines(self, seq, num_retries=None): for s in seq: self.write(s, num_retries) + @_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos @@ -843,6 +915,11 @@ class ArvadosFileWriter(ArvadosFileReader): if self._filepos > self.size(): self._filepos = self.size() + @_FileLikeObjectBase._before_close + def flush(self): + self.arvadosfile.flush() + + @_FileLikeObjectBase._before_close def close(self): - if self.arvadosfile.parent.sync_mode() == SYNC_LIVE: - self.arvadosfile.parent.root_collection().save() + self.flush() + super(ArvadosFileWriter, self).close()