X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/130b8d3147af5240ba3118c56b0aa1f19d2d5c59..3513c7def7eacdeef16c355f1b9be93830dcf946:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 2edd2e92a3..85366d2fdc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -2,21 +2,34 @@ import functools import os import zlib import bz2 -from .ranges import * -from arvados.retry import retry_method import config import hashlib -import hashlib import threading import Queue import copy import errno +import re +import logging +import collections + +from .errors import KeepWriteError, AssertionError, ArgumentError +from .keep import KeepLocator +from ._normalize_stream import normalize_stream +from ._ranges import locators_and_ranges, replace_range, Range +from .retry import retry_method + +MOD = "mod" +WRITE = "write" + +_logger = logging.getLogger('arvados.arvfile') def split(path): """split(path) -> streamname, filename - Separate the stream name and file name in a /-separated stream path. - If no stream name is available, assume '.'. + Separate the stream name and file name in a /-separated stream path and + return a tuple (stream_name, file_name). If no stream name is available, + assume '.'. + """ try: stream_name, file_name = path.rsplit('/', 1) @@ -24,7 +37,7 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name -class ArvadosFileBase(object): +class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name self.mode = mode @@ -33,11 +46,11 @@ class ArvadosFileBase(object): @staticmethod def _before_close(orig_func): @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): + def before_close_wrapper(self, *args, **kwargs): if self.closed: raise ValueError("I/O operation on closed stream file") return orig_func(self, *args, **kwargs) - return wrapper + return before_close_wrapper def __enter__(self): return self @@ -53,16 +66,9 @@ class ArvadosFileBase(object): self.closed = True -class ArvadosFileReaderBase(ArvadosFileBase): - class _NameAttribute(str): - # The Python file API provides a plain .name attribute. - # Older SDK provided a name() method. - # This class provides both, for maximum compatibility. - def __call__(self): - return self - +class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): - super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode) + super(ArvadosFileReaderBase, self).__init__(name, mode) self._filepos = 0L self.num_retries = num_retries self._readline_cache = (None, None) @@ -77,8 +83,8 @@ class ArvadosFileReaderBase(ArvadosFileBase): def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) - @ArvadosFileBase._before_close - def seek(self, pos, whence=os.SEEK_CUR): + @_FileLikeObjectBase._before_close + def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: @@ -88,7 +94,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): def tell(self): return self._filepos - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: @@ -97,12 +103,13 @@ class ArvadosFileReaderBase(ArvadosFileBase): break yield data - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readline(self, size=float('inf'), num_retries=None): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -118,18 +125,19 @@ class ArvadosFileReaderBase(ArvadosFileBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): - for segment in self.readall(size, num_retries): + for segment in self.readall(size, num_retries=num_retries): data = decompress(segment) if data: yield data - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) @@ -144,7 +152,7 @@ class ArvadosFileReaderBase(ArvadosFileBase): else: return self.readall(size, num_retries=num_retries) - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readlines(self, sizehint=float('inf'), num_retries=None): data = [] @@ -167,8 +175,15 @@ class ArvadosFileReaderBase(ArvadosFileBase): class StreamFileReader(ArvadosFileReaderBase): + class _NameAttribute(str): + # The Python file API provides a plain .name attribute. + # Older SDK provided a name() method. + # This class provides both, for maximum compatibility. + def __call__(self): + return self + def __init__(self, stream, segments, name): - super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries) + super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) self._stream = stream self.segments = segments @@ -179,7 +194,7 @@ class StreamFileReader(ArvadosFileReaderBase): n = self.segments[-1] return n.range_start + n.range_size - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" @@ -190,14 +205,14 @@ class StreamFileReader(ArvadosFileReaderBase): available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] - data = self._stream._readfrom(lr.locator+lr.segment_offset, + data = self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries) self._filepos += len(data) return data - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" @@ -206,23 +221,35 @@ class StreamFileReader(ArvadosFileReaderBase): data = [] for lr in locators_and_ranges(self.segments, start, size): - data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size, + data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) return ''.join(data) def as_manifest(self): - from stream import normalize_stream segs = [] for r in self.segments: segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) return " ".join(normalize_stream(".", {self.name: segs})) + "\n" -class BufferBlock(object): - """ - A BufferBlock is a stand-in for a Keep block that is in the process of being - written. Writers can append to it, get the size, and compute the Keep locator. +def synchronized(orig_func): + @functools.wraps(orig_func) + def synchronized_wrapper(self, *args, **kwargs): + with self.lock: + return orig_func(self, *args, **kwargs) + return synchronized_wrapper + +class StateChangeError(Exception): + def __init__(self, message, state, nextstate): + super(StateChangeError, self).__init__(message) + self.state = state + self.nextstate = nextstate + +class _BufferBlock(object): + """A stand-in for a Keep block that is in the process of being written. + + Writers can append to it, get the size, and compute the Keep locator. There are three valid states: WRITABLE @@ -236,10 +263,13 @@ class BufferBlock(object): released, fetching the block will fetch it via keep client (since we discarded the internal copy), and identifiers referring to the BufferBlock can be replaced with the block locator. + """ + WRITABLE = 0 PENDING = 1 COMMITTED = 2 + ERROR = 3 def __init__(self, blockid, starting_capacity, owner): """ @@ -251,22 +281,28 @@ class BufferBlock(object): :owner: ArvadosFile that owns this block + """ self.blockid = blockid self.buffer_block = bytearray(starting_capacity) self.buffer_view = memoryview(self.buffer_block) self.write_pointer = 0 - self.state = BufferBlock.WRITABLE + self._state = _BufferBlock.WRITABLE self._locator = None self.owner = owner + self.lock = threading.Lock() + self.wait_for_commit = threading.Event() + self.error = None + @synchronized def append(self, data): + """Append some data to the buffer. + + Only valid if the block is in WRITABLE state. Implements an expanding + buffer, doubling capacity as needed to accomdate all the data. + """ - Append some data to the buffer. Only valid if the block is in WRITABLE - state. Implements an expanding buffer, doubling capacity as needed to - accomdate all the data. - """ - if self.state == BufferBlock.WRITABLE: + if self._state == _BufferBlock.WRITABLE: while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] @@ -278,34 +314,60 @@ class BufferBlock(object): else: raise AssertionError("Buffer block is not writable") + STATE_TRANSITIONS = frozenset([ + (WRITABLE, PENDING), + (PENDING, COMMITTED), + (PENDING, ERROR), + (ERROR, PENDING)]) + + @synchronized + def set_state(self, nextstate, val=None): + if (self._state, nextstate) not in self.STATE_TRANSITIONS: + raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) + self._state = nextstate + + if self._state == _BufferBlock.PENDING: + self.wait_for_commit.clear() + + if self._state == _BufferBlock.COMMITTED: + self._locator = val + self.buffer_view = None + self.buffer_block = None + self.wait_for_commit.set() + + if self._state == _BufferBlock.ERROR: + self.error = val + self.wait_for_commit.set() + + @synchronized + def state(self): + return self._state + def size(self): - """Amount of data written to the buffer""" + """The amount of data written to the buffer.""" return self.write_pointer + @synchronized def locator(self): """The Keep locator for this buffer's contents.""" if self._locator is None: self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) return self._locator + @synchronized + def clone(self, new_blockid, owner): + if self._state == _BufferBlock.COMMITTED: + raise AssertionError("Cannot duplicate committed buffer block") + bufferblock = _BufferBlock(new_blockid, self.size(), owner) + bufferblock.append(self.buffer_view[0:self.size()]) + return bufferblock -class AsyncKeepWriteErrors(Exception): - """ - Roll up one or more Keep write exceptions (generated by background - threads) into a single one. - """ - def __init__(self, errors): - self.errors = errors + @synchronized + def clear(self): + self.owner = None + self.buffer_block = None + self.buffer_view = None - def __repr__(self): - return "\n".join(self.errors) - -def _synchronized(orig_func): - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - with self.lock: - return orig_func(self, *args, **kwargs) - return wrapper class NoopLock(object): def __enter__(self): @@ -320,43 +382,44 @@ class NoopLock(object): def release(self): pass -SYNC_READONLY = 1 -SYNC_EXPLICIT = 2 -SYNC_LIVE = 3 -def _must_be_writable(orig_func): - # Decorator for methods that read actual Collection data. +def must_be_writable(orig_func): @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - if self.sync_mode() == SYNC_READONLY: - raise IOError((errno.EROFS, "Collection is read only")) + def must_be_writable_wrapper(self, *args, **kwargs): + if not self.writable(): + raise IOError(errno.EROFS, "Collection is read-only.") return orig_func(self, *args, **kwargs) - return wrapper + return must_be_writable_wrapper -class BlockManager(object): - """ - BlockManager handles buffer blocks, background block uploads, and - background block prefetch for a Collection of ArvadosFiles. +class _BlockManager(object): + """BlockManager handles buffer blocks. + + Also handles background block uploads, and background block prefetch for a + Collection of ArvadosFiles. + """ - def __init__(self, keep): + + DEFAULT_PUT_THREADS = 2 + DEFAULT_GET_THREADS = 2 + + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None - self._put_errors = None self._put_threads = None self._prefetch_queue = None self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = 2 - self.num_get_threads = 2 + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies - @_synchronized + @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): - """ - Allocate a new, empty bufferblock in WRITABLE state and return it. + """Allocate a new, empty bufferblock in WRITABLE state and return it. :blockid: optional block identifier, otherwise one will be automatically assigned @@ -366,40 +429,105 @@ class BlockManager(object): :owner: ArvadosFile that owns this block + """ if blockid is None: blockid = "bufferblock%i" % len(self._bufferblocks) - bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) - self._bufferblocks[bb.blockid] = bb - return bb + bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) + self._bufferblocks[bufferblock.blockid] = bufferblock + return bufferblock - @_synchronized - def dup_block(self, blockid, owner): - """ - Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock. + @synchronized + def dup_block(self, block, owner): + """Create a new bufferblock initialized with the content of an existing bufferblock. - :blockid: - the block to copy. May be an existing buffer block id. + :block: + the buffer block to copy. :owner: ArvadosFile that owns the new block + """ new_blockid = "bufferblock%i" % len(self._bufferblocks) - block = self._bufferblocks[blockid] - bb = BufferBlock(new_blockid, len(block), owner) - bb.append(block) - self._bufferblocks[bb.blockid] = bb - return bb + bufferblock = block.clone(new_blockid, owner) + self._bufferblocks[bufferblock.blockid] = bufferblock + return bufferblock - @_synchronized - def is_bufferblock(self, id): - return id in self._bufferblocks + @synchronized + def is_bufferblock(self, locator): + return locator in self._bufferblocks + + def _commit_bufferblock_worker(self): + """Background uploader thread.""" + + while True: + try: + bufferblock = self._put_queue.get() + if bufferblock is None: + return - @_synchronized + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) + bufferblock.set_state(_BufferBlock.COMMITTED, loc) + + except Exception as e: + bufferblock.set_state(_BufferBlock.ERROR, e) + finally: + if self._put_queue is not None: + self._put_queue.task_done() + + @synchronized + def start_put_threads(self): + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() + + def _block_prefetch_worker(self): + """The background downloader thread.""" + while True: + try: + b = self._prefetch_queue.get() + if b is None: + return + self._keep.get(b) + except Exception: + pass + + @synchronized + def start_get_threads(self): + if self._prefetch_threads is None: + self._prefetch_queue = Queue.Queue() + self._prefetch_threads = [] + for i in xrange(0, self.num_get_threads): + thread = threading.Thread(target=self._block_prefetch_worker) + self._prefetch_threads.append(thread) + thread.daemon = True + thread.start() + + + @synchronized def stop_threads(self): - """ - Shut down and wait for background upload and download threads to finish. - """ + """Shut down and wait for background upload and download threads to finish.""" + if self._put_threads is not None: for t in self._put_threads: self._put_queue.put(None) @@ -407,7 +535,6 @@ class BlockManager(object): t.join() self._put_threads = None self._put_queue = None - self._put_errors = None if self._prefetch_threads is not None: for t in self._prefetch_threads: @@ -417,143 +544,181 @@ class BlockManager(object): self._prefetch_threads = None self._prefetch_queue = None - def commit_bufferblock(self, block): - """ - Initiate a background upload of a bufferblock. This will block if the - upload queue is at capacity, otherwise it will return immediately. - """ + def __enter__(self): + return self - def worker(self): - """ - Background uploader thread. - """ - while True: - try: - b = self._put_queue.get() - if b is None: - return - b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes()) - b.state = BufferBlock.COMMITTED - b.buffer_view = None - b.buffer_block = None - except Exception as e: - print e - self._put_errors.put(e) - finally: - if self._put_queue is not None: - self._put_queue.task_done() + def __exit__(self, exc_type, exc_value, traceback): + self.stop_threads() + + @synchronized + def repack_small_blocks(self, force=False, sync=False): + """Packs small blocks together before uploading""" + # Search blocks ready for getting packed together before being committed to Keep. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Check if there are enough small blocks for filling up one in full + pending_write_size = sum([b.size() for b in small_blocks]) + if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): + new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None) + self._bufferblocks[new_bb.blockid] = new_bb + size = 0 + while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + size += bb.size() + arvfile = bb.owner + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())]) + bb.clear() + del self._bufferblocks[bb.blockid] + self.commit_bufferblock(new_bb, sync=sync) + + def commit_bufferblock(self, block, sync): + """Initiate a background upload of a bufferblock. + + :block: + The block object to upload + + :sync: + If `sync` is True, upload the block synchronously. + If `sync` is False, upload the block asynchronously. This will + return immediately unless the upload queue is at capacity, in + which case it will wait on an upload queue slot. - with self.lock: - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - self._put_errors = Queue.Queue() - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - t = threading.Thread(target=worker, args=(self,)) - self._put_threads.append(t) - t.daemon = True - t.start() - - # Mark the block as PENDING so to disallow any more appends. - block.state = BufferBlock.PENDING - self._put_queue.put(block) - - def get_block(self, locator, num_retries, cache_only=False): """ - Fetch a block. First checks to see if the locator is a BufferBlock and - return that, if not, passes the request through to KeepClient.get(). + try: + # Mark the block as PENDING so to disallow any more appends. + block.set_state(_BufferBlock.PENDING) + except StateChangeError as e: + if e.state == _BufferBlock.PENDING: + if sync: + block.wait_for_commit.wait() + else: + return + if block.state() == _BufferBlock.COMMITTED: + return + elif block.state() == _BufferBlock.ERROR: + raise block.error + else: + raise + + if sync: + try: + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) + block.set_state(_BufferBlock.COMMITTED, loc) + except Exception as e: + block.set_state(_BufferBlock.ERROR, e) + raise + else: + self.start_put_threads() + self._put_queue.put(block) + + @synchronized + def get_bufferblock(self, locator): + return self._bufferblocks.get(locator) + + @synchronized + def delete_bufferblock(self, locator): + bb = self._bufferblocks[locator] + bb.clear() + del self._bufferblocks[locator] + + def get_block_contents(self, locator, num_retries, cache_only=False): + """Fetch a block. + + First checks to see if the locator is a BufferBlock and return that, if + not, passes the request through to KeepClient.get(). + """ with self.lock: if locator in self._bufferblocks: - bb = self._bufferblocks[locator] - if bb.state != BufferBlock.COMMITTED: - return bb.buffer_view[0:bb.write_pointer].tobytes() + bufferblock = self._bufferblocks[locator] + if bufferblock.state() != _BufferBlock.COMMITTED: + return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() else: - locator = bb._locator - return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only) + locator = bufferblock._locator + if cache_only: + return self._keep.get_from_cache(locator) + else: + return self._keep.get(locator, num_retries=num_retries) def commit_all(self): + """Commit all outstanding buffer blocks. + + This is a synchronous call, and will not return until all buffer blocks + are uploaded. Raises KeepWriteError() if any blocks failed to upload. + """ - Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this - is a synchronous call, and will not return until all buffer blocks are - uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to - upload. - """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state == BufferBlock.WRITABLE: - self.commit_bufferblock(v) + if v.state() != _BufferBlock.COMMITTED and v.owner: + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: self._put_queue.join() - if not self._put_errors.empty(): - e = [] - try: - while True: - e.append(self._put_errors.get(False)) - except Queue.Empty: - pass - raise AsyncKeepWriteErrors(e) + + err = [] + for k,v in items: + if v.state() == _BufferBlock.ERROR: + err.append((v.locator(), v.error)) + if err: + raise KeepWriteError("Error writing some blocks", err, label="block") + + for k,v in items: + # flush again with sync=True to remove committed bufferblocks from + # the segments. + if v.owner: + v.owner.flush(sync=True) def block_prefetch(self, locator): - """ - Initiate a background download of a block. This assumes that the - underlying KeepClient implements a block cache, so repeated requests - for the same block will not result in repeated downloads (unless the - block is evicted from the cache.) This method does not block. + """Initiate a background download of a block. + + This assumes that the underlying KeepClient implements a block cache, + so repeated requests for the same block will not result in repeated + downloads (unless the block is evicted from the cache.) This method + does not block. + """ if not self.prefetch_enabled: return - def worker(self): - """Background downloader thread.""" - while True: - try: - b = self._prefetch_queue.get() - if b is None: - return - self._keep.get(b) - except: - pass + if self._keep.get_from_cache(locator) is not None: + return with self.lock: if locator in self._bufferblocks: return - if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() - self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): - t = threading.Thread(target=worker, args=(self,)) - self._prefetch_threads.append(t) - t.daemon = True - t.start() + + self.start_get_threads() self._prefetch_queue.put(locator) class ArvadosFile(object): - """ - ArvadosFile manages the underlying representation of a file in Keep as a sequence of - segments spanning a set of blocks, and implements random read/write access. + """Represent a file in a Collection. + + ArvadosFile manages the underlying representation of a file in Keep as a + sequence of segments spanning a set of blocks, and implements random + read/write access. + + This object may be accessed from multiple threads. + """ - def __init__(self, parent, stream=[], segments=[]): + def __init__(self, parent, name, stream=[], segments=[]): """ + ArvadosFile constructor. + :stream: a list of Range objects representing a block stream @@ -561,69 +726,137 @@ class ArvadosFile(object): a list of Range objects representing segments """ self.parent = parent - self._modified = True + self.name = name + self._writers = set() + self._committed = False self._segments = [] + self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None - if parent.sync_mode() == SYNC_READONLY: - self.lock = NoopLock() - else: - self.lock = threading.Lock() - def sync_mode(self): - return self.parent.sync_mode() + def writable(self): + return self.parent.writable() - @_synchronized + @synchronized def segments(self): return copy.copy(self._segments) - @_synchronized - def clone(self, new_parent): + @synchronized + def clone(self, new_parent, new_name): """Make a copy of this file.""" - cp = ArvadosFile(new_parent) - cp._modified = False + cp = ArvadosFile(new_parent, new_name) + cp.replace_contents(self) + return cp + + @must_be_writable + @synchronized + def replace_contents(self, other): + """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} - for r in self._segments: - new_loc = r.locator - if self.parent._my_block_manager().is_bufferblock(r.locator): - if r.locator not in map_loc: - map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid - new_loc = map_loc[r.locator] + self._segments = [] + for other_segment in other.segments(): + new_loc = other_segment.locator + if other.parent._my_block_manager().is_bufferblock(other_segment.locator): + if other_segment.locator not in map_loc: + bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) + if bufferblock.state() != _BufferBlock.WRITABLE: + map_loc[other_segment.locator] = bufferblock.locator() + else: + map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid + new_loc = map_loc[other_segment.locator] - cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset)) + self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - return cp + self._committed = False - @_synchronized def __eq__(self, other): - if type(other) != ArvadosFile: + if other is self: + return True + if not isinstance(other, ArvadosFile): return False - return self._segments == other.segments() + + othersegs = other.segments() + with self.lock: + if len(self._segments) != len(othersegs): + return False + for i in xrange(0, len(othersegs)): + seg1 = self._segments[i] + seg2 = othersegs[i] + loc1 = seg1.locator + loc2 = seg2.locator + + if self.parent._my_block_manager().is_bufferblock(loc1): + loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() + + if other.parent._my_block_manager().is_bufferblock(loc2): + loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() + + if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or + seg1.range_start != seg2.range_start or + seg1.range_size != seg2.range_size or + seg1.segment_offset != seg2.segment_offset): + return False + + return True def __ne__(self, other): return not self.__eq__(other) - @_synchronized - def set_unmodified(self): - """Clear the modified flag""" - self._modified = False + @synchronized + def set_segments(self, segs): + self._segments = segs - @_synchronized - def modified(self): - """Test the modified flag""" - return self._modified + @synchronized + def set_committed(self): + """Set committed flag to True""" + self._committed = True - @_must_be_writable - @_synchronized - def truncate(self, size): + @synchronized + def committed(self): + """Get whether this is committed or not.""" + return self._committed + + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. """ - Adjust the size of the file. If `size` is less than the size of the file, - the file contents after `size` will be discarded. If `size` is greater - than the current size of the file, an IOError will be raised. + self._writers.remove(writer) + + if self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks() + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + + @must_be_writable + @synchronized + def truncate(self, size): + """Shrink the size of the file. + + If `size` is less than the size of the file, the file contents after + `size` will be discarded. If `size` is greater than the current size + of the file, an IOError will be raised. + """ - if size < self._size(): + if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size @@ -631,7 +864,7 @@ class ArvadosFile(object): # segment is past the trucate size, all done break elif size < range_end: - nr = Range(r.locator, r.range_start, size - r.range_start) + nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break @@ -639,36 +872,50 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._modified = True - elif size > self._size(): - raise IOError("truncate() does not support extending the file size") + self._committed = False + elif size > self.size(): + raise IOError(errno.EINVAL, "truncate() does not support extending the file size") - @_synchronized - def readfrom(self, offset, size, num_retries): - """ - read upto `size` bytes from the file starting at `offset`. + def readfrom(self, offset, size, num_retries, exact=False): + """Read up to `size` bytes from the file starting at `offset`. + + :exact: + If False (default), return less data than requested if the read + crosses a block boundary and the next block isn't cached. If True, + only return less data than requested when hitting EOF. """ - if size == 0 or offset >= self._size(): - return '' - data = [] - for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE): - self.parent._my_block_manager().block_prefetch(lr.locator) + with self.lock: + if size == 0 or offset >= self.size(): + return '' + readsegs = locators_and_ranges(self._segments, offset, size) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) - for lr in locators_and_ranges(self._segments, offset, size): - d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data)) - if d: - data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size]) + locs = set() + data = [] + for lr in readsegs: + block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) + if block: + blockview = memoryview(block) + data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) + locs.add(lr.locator) else: break + + for lr in prefetch: + if lr.locator not in locs: + self.parent._my_block_manager().block_prefetch(lr.locator) + locs.add(lr.locator) + return ''.join(data) - def _repack_writes(self): - """ - Test if the buffer block has more data than is referenced by actual segments - (this happens when a buffered write over-writes a file range written in - a previous buffered write). Re-pack the buffer block for efficiency + def _repack_writes(self, num_retries): + """Test if the buffer block has more data than actual segments. + + This happens when a buffered write over-writes a file range written in + a previous buffered write. Re-pack the buffer block for efficiency and to avoid leaking information. + """ segs = self._segments @@ -680,94 +927,189 @@ class ArvadosFile(object): if write_total < self._current_bblock.size(): # There is more data in the buffer block than is actually accounted for by segments, so # re-pack into a new buffer by copying over to a new buffer block. + contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) for t in bufferblock_segs: - new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes()) + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) t.segment_offset = new_bb.size() - t.range_size self._current_bblock = new_bb - @_must_be_writable - @_synchronized + @must_be_writable + @synchronized def writeto(self, offset, data, num_retries): - """ - Write `data` to the file starting at `offset`. This will update - existing bytes and/or extend the size of the file as necessary. + """Write `data` to the file starting at `offset`. + + This will update existing bytes and/or extend the size of the file as + necessary. + """ if len(data) == 0: return - if offset > self._size(): + if offset > self.size(): raise ArgumentError("Offset is past the end of the file") if len(data) > config.KEEP_BLOCK_SIZE: - raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE)) + # Chunk it up into smaller writes + n = 0 + dataview = memoryview(data) + while n < len(data): + self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) + n += config.KEEP_BLOCK_SIZE + return - self._modified = True + self._committed = False - if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE: + if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes() + self._repack_writes(num_retries) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) - @_must_be_writable - @_synchronized + self.parent.notify(WRITE, self.parent, self.name, (self, self)) + + return len(data) + + @synchronized + def flush(self, sync=True, num_retries=0): + """Flush the current bufferblock to Keep. + + :sync: + If True, commit block synchronously, wait until buffer block has been written. + If False, commit block asynchronously, return immediately after putting block into + the keep put queue. + """ + if self.committed(): + return + + if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: + if self._current_bblock.state() == _BufferBlock.WRITABLE: + self._repack_writes(num_retries) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) + + if sync: + to_delete = set() + for s in self._segments: + bb = self.parent._my_block_manager().get_bufferblock(s.locator) + if bb: + if bb.state() != _BufferBlock.COMMITTED: + self.parent._my_block_manager().commit_bufferblock(bb, sync=True) + to_delete.add(s.locator) + s.locator = bb.locator() + for s in to_delete: + self.parent._my_block_manager().delete_bufferblock(s) + + self.parent.notify(MOD, self.parent, self.name, (self, self)) + + @must_be_writable + @synchronized def add_segment(self, blocks, pos, size): - # Synchronized public api, see _add_segment + """Add a segment to the end of the file. + + `pos` and `offset` reference a section of the stream described by + `blocks` (a list of Range objects) + + """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): - """ - Add a segment to the end of the file, with `pos` and `offset` referencing a - section of the stream described by `blocks` (a list of Range objects) - """ - self._modified = True + """Internal implementation of add_segment.""" + self._committed = False for lr in locators_and_ranges(blocks, pos, size): - last = self._segments[-1] if self._segments else Range(0, 0, 0) + last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) - def _size(self): - """Get the file size""" + @synchronized + def size(self): + """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 - @_synchronized - def size(self): - """Get the file size""" - return self._size() + @synchronized + def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + buf = "" + filestream = [] + for segment in self.segments: + loc = segment.locator + if loc.startswith("bufferblock"): + loc = self._bufferblocks[loc].calculate_locator() + if portable_locators: + loc = KeepLocator(loc).stripped() + filestream.append(LocatorAndRange(loc, locator_block_size(loc), + segment.segment_offset, segment.range_size)) + buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) + buf += "\n" + return buf + + @must_be_writable + @synchronized + def _reparent(self, newparent, newname): + self._committed = False + self.flush(sync=True) + self.parent.remove(self.name) + self.parent = newparent + self.name = newname + self.lock = self.parent.root_collection().lock + class ArvadosFileReader(ArvadosFileReaderBase): - def __init__(self, arvadosfile, name, mode="r", num_retries=None): - super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries) + """Wraps ArvadosFile in a file-like object supporting reading only. + + Be aware that this class is NOT thread safe as there is no locking around + updating file pointer. + + """ + + def __init__(self, arvadosfile, num_retries=None): + super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): return self.arvadosfile.size() - @ArvadosFileBase._before_close + def stream_name(self): + return self.arvadosfile.parent.stream_name() + + @_FileLikeObjectBase._before_close @retry_method - def read(self, size, num_retries=None): - """Read up to `size` bytes from the stream, starting at the current file position""" - data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries) - self._filepos += len(data) - return data + def read(self, size=None, num_retries=None): + """Read up to `size` bytes from the file and return the result. + + Starts at the current file position. If `size` is None, read the + entire remainder of the file. + """ + if size is None: + data = [] + rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) + while rd: + data.append(rd) + self._filepos += len(rd) + rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) + return ''.join(data) + else: + data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) + self._filepos += len(data) + return data - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): - """Read up to `size` bytes from the stream, starting at the current file position""" + """Read up to `size` bytes from the stream, starting at the specified file offset. + + This method does not change the file position. + """ return self.arvadosfile.readfrom(offset, size, num_retries) def flush(self): @@ -775,27 +1117,47 @@ class ArvadosFileReader(ArvadosFileReaderBase): class ArvadosFileWriter(ArvadosFileReader): - def __init__(self, arvadosfile, name, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) + """Wraps ArvadosFile in a file-like object supporting both reading and writing. + + Be aware that this class is NOT thread safe as there is no locking around + updating file pointer. - @ArvadosFileBase._before_close + """ + + def __init__(self, arvadosfile, mode, num_retries=None): + super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) + self.mode = mode + self.arvadosfile.add_writer(self) + + @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": - self.arvadosfile.writeto(self.size(), data) + self.arvadosfile.writeto(self.size(), data, num_retries) else: self.arvadosfile.writeto(self._filepos, data, num_retries) self._filepos += len(data) + return len(data) - @ArvadosFileBase._before_close + @_FileLikeObjectBase._before_close @retry_method def writelines(self, seq, num_retries=None): for s in seq: - self.write(s) + self.write(s, num_retries=num_retries) + @_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos self.arvadosfile.truncate(size) if self._filepos > self.size(): self._filepos = self.size() + + @_FileLikeObjectBase._before_close + def flush(self): + self.arvadosfile.flush() + + def close(self): + if not self.closed: + self.arvadosfile.remove_writer(self) + super(ArvadosFileWriter, self).close()