X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/93d38f3cb3d7874ed3723409357d8dd5e8d618a5..f78b9c586f9ffa9846d68d35a10396261691da41:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index d9d9cd287a..53ae4a836c 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -2,24 +2,26 @@ import functools import os import zlib import bz2 -from ._ranges import locators_and_ranges, replace_range, Range -from arvados.retry import retry_method import config import hashlib -import hashlib import threading import Queue import copy import errno +import re + from .errors import KeepWriteError, AssertionError from .keep import KeepLocator -from _normalize_stream import normalize_stream +from ._normalize_stream import normalize_stream +from ._ranges import locators_and_ranges, replace_range, Range +from .retry import retry_method def split(path): - """Separate the stream name and file name in a /-separated stream path and - return a tuple (stream_name, file_name). + """split(path) -> streamname, filename - If no stream name is available, assume '.'. + Separate the stream name and file name in a /-separated stream path and + return a tuple (stream_name, file_name). If no stream name is available, + assume '.'. """ try: @@ -58,15 +60,8 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): - class _NameAttribute(str): - # The Python file API provides a plain .name attribute. - # Older SDK provided a name() method. - # This class provides both, for maximum compatibility. - def __call__(self): - return self - def __init__(self, name, mode, num_retries=None): - super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode) + super(ArvadosFileReaderBase, self).__init__(name, mode) self._filepos = 0L self.num_retries = num_retries self._readline_cache = (None, None) @@ -171,8 +166,15 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): class StreamFileReader(ArvadosFileReaderBase): + class _NameAttribute(str): + # The Python file API provides a plain .name attribute. + # Older SDK provided a name() method. + # This class provides both, for maximum compatibility. + def __call__(self): + return self + def __init__(self, stream, segments, name): - super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries) + super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) self._stream = stream self.segments = segments @@ -194,7 +196,7 @@ class StreamFileReader(ArvadosFileReaderBase): available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] - data = self._stream._readfrom(lr.locator+lr.segment_offset, + data = self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries) @@ -210,7 +212,7 @@ class StreamFileReader(ArvadosFileReaderBase): data = [] for lr in locators_and_ranges(self.segments, start, size): - data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size, + data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) return ''.join(data) @@ -229,8 +231,7 @@ def synchronized(orig_func): return synchronized_wrapper class _BufferBlock(object): - """A BufferBlock is a stand-in for a Keep block that is in the process of being - written. + """A stand-in for a Keep block that is in the process of being written. Writers can append to it, get the size, and compute the Keep locator. There are three valid states: @@ -321,6 +322,14 @@ class _BufferBlock(object): self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) return self._locator + @synchronized + def clone(self, new_blockid, owner): + if self._state == _BufferBlock.COMMITTED: + raise AssertionError("Can only duplicate a writable or pending buffer block") + bufferblock = _BufferBlock(new_blockid, self.size(), owner) + bufferblock.append(self.buffer_view[0:self.size()]) + return bufferblock + class NoopLock(object): def __enter__(self): @@ -335,22 +344,21 @@ class NoopLock(object): def release(self): pass -SYNC_READONLY = 1 -SYNC_EXPLICIT = 2 -SYNC_LIVE = 3 def must_be_writable(orig_func): @functools.wraps(orig_func) def must_be_writable_wrapper(self, *args, **kwargs): - if self.sync_mode() == SYNC_READONLY: - raise IOError((errno.EROFS, "Collection is read only")) + if not self.writable(): + raise IOError((errno.EROFS, "Collection must be writable.")) return orig_func(self, *args, **kwargs) return must_be_writable_wrapper class _BlockManager(object): - """BlockManager handles buffer blocks, background block uploads, and background - block prefetch for a Collection of ArvadosFiles. + """BlockManager handles buffer blocks. + + Also handles background block uploads, and background block prefetch for a + Collection of ArvadosFiles. """ def __init__(self, keep): @@ -389,8 +397,7 @@ class _BlockManager(object): @synchronized def dup_block(self, block, owner): - """Create a new bufferblock in WRITABLE state, initialized with the content of - an existing bufferblock. + """Create a new bufferblock initialized with the content of an existing bufferblock. :block: the buffer block to copy. @@ -400,12 +407,7 @@ class _BlockManager(object): """ new_blockid = "bufferblock%i" % len(self._bufferblocks) - with block.lock: - if block._state == _BufferBlock.COMMITTED: - raise AssertionError("Can only duplicate a writable or pending buffer block") - - bufferblock = _BufferBlock(new_blockid, block.size(), owner) - bufferblock.append(block.buffer_view[0:block.size()]) + bufferblock = block.clone(new_blockid, owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -454,7 +456,6 @@ class _BlockManager(object): bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: - print e self._put_errors.put((bufferblock.locator(), e)) finally: if self._put_queue is not None: @@ -536,7 +537,7 @@ class _BlockManager(object): err.append(self._put_errors.get(False)) except Queue.Empty: pass - raise KeepWriteError("Error writing some blocks", err) + raise KeepWriteError("Error writing some blocks", err, label="block") def block_prefetch(self, locator): """Initiate a background download of a block. @@ -559,7 +560,7 @@ class _BlockManager(object): if b is None: return self._keep.get(b) - except: + except Exception: pass with self.lock: @@ -577,7 +578,9 @@ class _BlockManager(object): class ArvadosFile(object): - """ArvadosFile manages the underlying representation of a file in Keep as a + """Represent a file in a Collection. + + ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. @@ -603,8 +606,8 @@ class ArvadosFile(object): self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None - def sync_mode(self): - return self.parent.sync_mode() + def writable(self): + return self.parent.writable() @synchronized def segments(self): @@ -734,8 +737,7 @@ class ArvadosFile(object): return ''.join(data) def _repack_writes(self): - """Test if the buffer block has more data than is referenced by actual - segments. + """Test if the buffer block has more data than actual segments. This happens when a buffered write over-writes a file range written in a previous buffered write. Re-pack the buffer block for efficiency @@ -792,11 +794,19 @@ class ArvadosFile(object): replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + @synchronized + def flush(self): + if self._current_bblock: + self._repack_writes() + self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + @must_be_writable @synchronized def add_segment(self, blocks, pos, size): - """Add a segment to the end of the file, with `pos` and `offset` referencing a - section of the stream described by `blocks` (a list of Range objects) + """Add a segment to the end of the file. + + `pos` and `offset` reference a section of the stream described by + `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) @@ -818,22 +828,19 @@ class ArvadosFile(object): else: return 0 - @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): buf = "" - item = self filestream = [] - for segment in item.segments: + for segment in self.segments: loc = segment.locator if loc.startswith("bufferblock"): - loc = item._bufferblocks[loc].calculate_locator() + loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, locator_block_size(loc), segment.segment_offset, segment.range_size)) - stream[stream_name] = filestream - buf += ' '.join(normalize_stream(stream_name, stream)) + buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) buf += "\n" return buf @@ -900,6 +907,7 @@ class ArvadosFileWriter(ArvadosFileReader): for s in seq: self.write(s, num_retries) + @_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos @@ -907,6 +915,11 @@ class ArvadosFileWriter(ArvadosFileReader): if self._filepos > self.size(): self._filepos = self.size() + @_FileLikeObjectBase._before_close + def flush(self): + self.arvadosfile.flush() + def close(self): - if self.arvadosfile.parent.sync_mode() == SYNC_LIVE: - self.arvadosfile.parent.root_collection().save() + if not self.closed: + self.flush() + super(ArvadosFileWriter, self).close()