X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dcab7519db659e2487db893fda2c8237203c3294..ea6f25f0dde5c750eacea29662c19149c7800134:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 71af6445a5..6893b94bf7 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,20 +1,34 @@ -import functools -import os -import zlib +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import absolute_import +from __future__ import division +from future import standard_library +from future.utils import listitems, listvalues +standard_library.install_aliases() +from builtins import range +from builtins import object import bz2 -import config -import hashlib -import threading -import Queue +import collections import copy import errno -import re +import functools +import hashlib import logging +import os +import queue +import re +import sys +import threading +import uuid +import zlib +from . import config from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream -from ._ranges import locators_and_ranges, replace_range, Range +from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange from .retry import retry_method MOD = "mod" @@ -36,6 +50,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -68,7 +88,7 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._filepos = 0L + self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -88,17 +108,29 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + if pos < 0: + raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") + self._filepos = pos + return self._filepos def tell(self): return self._filepos + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: data = self.read(size, num_retries=num_retries) - if data == '': + if len(data) == 0: break yield data @@ -108,23 +140,25 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: - data = [''] + data = [b''] data_size = len(data[-1]) - while (data_size < size) and ('\n' not in data[-1]): + while (data_size < size) and (b'\n' not in data[-1]): next_read = self.read(2 ** 20, num_retries=num_retries) if not next_read: break data.append(next_read) data_size += len(next_read) - data = ''.join(data) + data = b''.join(data) try: - nextline_index = data.index('\n') + 1 + nextline_index = data.index(b'\n') + 1 except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) - return data[:nextline_index] + return data[:nextline_index].decode() @_FileLikeObjectBase._before_close @retry_method @@ -159,16 +193,16 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): data_size += len(s) if data_size >= sizehint: break - return ''.join(data).splitlines(True) + return b''.join(data).decode().splitlines(True) def size(self): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def read(self, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def readfrom(self, start, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") class StreamFileReader(ArvadosFileReaderBase): @@ -196,15 +230,15 @@ class StreamFileReader(ArvadosFileReaderBase): def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: - return '' + return b'' - data = '' + data = b'' available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, - lr.segment_size, - num_retries=num_retries) + lr.segment_size, + num_retries=num_retries) self._filepos += len(data) return data @@ -214,13 +248,13 @@ class StreamFileReader(ArvadosFileReaderBase): def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: - return '' + return b'' data = [] for lr in locators_and_ranges(self.segments, start, size): data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) - return ''.join(data) + return b''.join(data) def as_manifest(self): segs = [] @@ -267,6 +301,7 @@ class _BufferBlock(object): PENDING = 1 COMMITTED = 2 ERROR = 3 + DELETED = 4 def __init__(self, blockid, starting_capacity, owner): """ @@ -300,6 +335,8 @@ class _BufferBlock(object): """ if self._state == _BufferBlock.WRITABLE: + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] @@ -361,10 +398,54 @@ class _BufferBlock(object): @synchronized def clear(self): + self._state = _BufferBlock.DELETED self.owner = None self.buffer_block = None self.buffer_view = None + @synchronized + def repack_writes(self): + """Optimize buffer block by repacking segments in file sequence. + + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. + + """ + if self._state != _BufferBlock.WRITABLE: + raise AssertionError("Cannot repack non-writable block") + + segs = self.owner.segments() + + # Collect the segments that reference the buffer block. + bufferblock_segs = [s for s in segs if s.locator == self.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). + write_total = sum([s.range_size for s in bufferblock_segs]) + + if write_total < self.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. + contents = self.buffer_view[0:self.write_pointer].tobytes() + new_bb = _BufferBlock(None, write_total, None) + for t in bufferblock_segs: + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) + t.segment_offset = new_bb.size() - t.range_size + + self.buffer_block = new_bb.buffer_block + self.buffer_view = new_bb.buffer_view + self.write_pointer = new_bb.write_pointer + self._locator = None + new_bb.clear() + self.owner.set_segments(segs) + + def __repr__(self): + return "" % (self.blockid) + class NoopLock(object): def __enter__(self): @@ -400,18 +481,26 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None, put_threads=None, num_retries=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + if put_threads: + self.num_put_threads = put_threads + else: + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() + self.padding_block = None + self.num_retries = num_retries @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -427,8 +516,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = str(uuid.uuid4()) bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -444,7 +536,7 @@ class _BlockManager(object): ArvadosFile that owns the new block """ - new_blockid = "bufferblock%i" % len(self._bufferblocks) + new_blockid = str(uuid.uuid4()) bufferblock = block.clone(new_blockid, owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -462,37 +554,39 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) - except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) finally: if self._put_queue is not None: self._put_queue.task_done() - @synchronized def start_put_threads(self): - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=self._commit_bufferblock_worker) - self._put_threads.append(thread) - thread.daemon = True - thread.start() + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = queue.Queue(maxsize=2) + + self._put_threads = [] + for i in range(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() def _block_prefetch_worker(self): """The background downloader thread.""" @@ -503,14 +597,14 @@ class _BlockManager(object): return self._keep.get(b) except Exception: - pass + _logger.exception("Exception doing block prefetch") @synchronized def start_get_threads(self): if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() + self._prefetch_queue = queue.Queue() self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): + for i in range(0, self.num_get_threads): thread = threading.Thread(target=self._block_prefetch_worker) self._prefetch_threads.append(thread) thread.daemon = True @@ -543,6 +637,63 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): + """Packs small blocks together before uploading""" + + self._pending_write_size += closed_file_size + + # Check if there are enough small blocks for filling up one in full + if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): + return + + # Search blocks ready for getting packed together before being + # committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that its + # size is <= KEEP_BLOCK_SIZE/2. + try: + small_blocks = [b for b in listvalues(self._bufferblocks) + if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + for bb in small_blocks: + bb.repack_writes() + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + new_bb.owner = [] + files = [] + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + new_bb.owner.append(bb.owner) + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + files.append((bb, new_bb.write_pointer - bb.size())) + + self.commit_bufferblock(new_bb, sync=sync) + + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.blockid + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) + def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -556,7 +707,6 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) @@ -575,7 +725,10 @@ class _BlockManager(object): if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -588,8 +741,27 @@ class _BlockManager(object): def get_bufferblock(self, locator): return self._bufferblocks.get(locator) + @synchronized + def get_padding_block(self): + """Get a bufferblock 64 MB in size consisting of all zeros, used as padding + when using truncate() to extend the size of a file. + + For reference (and possible future optimization), the md5sum of the + padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 + + """ + + if self.padding_block is None: + self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) + self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE + self.commit_bufferblock(self.padding_block, False) + return self.padding_block + @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -620,12 +792,17 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: - items = self._bufferblocks.items() + items = listitems(self._bufferblocks) for k,v in items: - if v.state() != _BufferBlock.COMMITTED: - v.owner.flush(sync=False) + if v.state() != _BufferBlock.COMMITTED and v.owner: + # Ignore blocks with a list of owners, as if they're not in COMMITTED + # state, they're already being committed asynchronously. + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: @@ -642,7 +819,15 @@ class _BlockManager(object): # flush again with sync=True to remove committed bufferblocks from # the segments. if v.owner: - v.owner.flush(sync=True) + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=True) + elif isinstance(v.owner, list) and len(v.owner) > 0: + # This bufferblock is referenced by many files as a result + # of repacking small blocks, so don't delete it when flushing + # its owners, just do it after flushing them all. + for owner in v.owner: + owner.flush(sync=True) + self.delete_bufferblock(k) def block_prefetch(self, locator): """Initiate a background download of a block. @@ -679,6 +864,9 @@ class ArvadosFile(object): """ + __slots__ = ('parent', 'name', '_writers', '_committed', + '_segments', 'lock', '_current_bblock', 'fuse_entry') + def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. @@ -691,6 +879,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -701,6 +890,46 @@ class ArvadosFile(object): def writable(self): return self.parent.writable() + @synchronized + def permission_expired(self, as_of_dt=None): + """Returns True if any of the segment's locators is expired""" + for r in self._segments: + if KeepLocator(r.locator).permission_expired(as_of_dt): + return True + return False + + @synchronized + def has_remote_blocks(self): + """Returns True if any of the segment's locators has a +R signature""" + + for s in self._segments: + if '+R' in s.locator: + return True + return False + + @synchronized + def _copy_remote_blocks(self, remote_blocks={}): + """Ask Keep to copy remote blocks and point to their local copies. + + This is called from the parent Collection. + + :remote_blocks: + Shared cache of remote to local block mappings. This is used to avoid + doing extra work when blocks are shared by more than one file in + different subdirectories. + """ + + for s in self._segments: + if '+R' in s.locator: + try: + loc = remote_blocks[s.locator] + except KeyError: + loc = self.parent._my_keep().refresh_signature(s.locator) + remote_blocks[s.locator] = loc + s.locator = loc + self.parent.set_committed(False) + return remote_blocks + @synchronized def segments(self): return copy.copy(self._segments) @@ -732,7 +961,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._committed = False + self.set_committed(False) def __eq__(self, other): if other is self: @@ -744,7 +973,7 @@ class ArvadosFile(object): with self.lock: if len(self._segments) != len(othersegs): return False - for i in xrange(0, len(othersegs)): + for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator @@ -768,23 +997,64 @@ class ArvadosFile(object): return not self.__eq__(other) @synchronized - def set_committed(self): - """Set committed flag to False""" - self._committed = True + def set_segments(self, segs): + self._segments = segs + + @synchronized + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer, flush): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): - """Shrink the size of the file. + """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size - of the file, an IOError will be raised. + of the file, it will be filled with zero bytes. """ if size < self.size(): @@ -803,9 +1073,19 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._committed = False + self.set_committed(False) elif size > self.size(): - raise IOError(errno.EINVAL, "truncate() does not support extending the file size") + padding = self.parent._my_block_manager().get_padding_block() + diff = size - self.size() + while diff > config.KEEP_BLOCK_SIZE: + self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) + diff -= config.KEEP_BLOCK_SIZE + if diff > 0: + self._segments.append(Range(padding.blockid, self.size(), diff, 0)) + self.set_committed(False) + else: + # size == self.size() + pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. @@ -818,7 +1098,7 @@ class ArvadosFile(object): with self.lock: if size == 0 or offset >= self.size(): - return '' + return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) @@ -838,33 +1118,7 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - return ''.join(data) - - def _repack_writes(self, num_retries): - """Test if the buffer block has more data than actual segments. - - This happens when a buffered write over-writes a file range written in - a previous buffered write. Re-pack the buffer block for efficiency - and to avoid leaking information. - - """ - segs = self._segments - - # Sum up the segments to get the total bytes of the file referencing - # into the buffer block. - bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] - write_total = sum([s.range_size for s in bufferblock_segs]) - - if write_total < self._current_bblock.size(): - # There is more data in the buffer block than is actually accounted for by segments, so - # re-pack into a new buffer by copying over to a new buffer block. - contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) - new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) - for t in bufferblock_segs: - new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) - t.segment_offset = new_bb.size() - t.range_size - - self._current_bblock = new_bb + return b''.join(data) @must_be_writable @synchronized @@ -875,11 +1129,13 @@ class ArvadosFile(object): necessary. """ + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() if len(data) == 0: return if offset > self.size(): - raise ArgumentError("Offset is past the end of the file") + self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes @@ -890,13 +1146,13 @@ class ArvadosFile(object): n += config.KEEP_BLOCK_SIZE return - self._committed = False + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes(num_retries) + self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -923,8 +1179,9 @@ class ArvadosFile(object): if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: - self._repack_writes(num_retries) - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) + self._current_bblock.repack_writes() + if self._current_bblock.state() != _BufferBlock.DELETED: + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() @@ -936,7 +1193,10 @@ class ArvadosFile(object): to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: - self.parent._my_block_manager().delete_bufferblock(s) + # Don't delete the bufferblock if it's owned by many files. It'll be + # deleted after all of its owners are flush()ed. + if self.parent._my_block_manager().get_bufferblock(s).owner is self: + self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @@ -953,7 +1213,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._committed = False + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -969,25 +1229,28 @@ class ArvadosFile(object): return 0 @synchronized - def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + def manifest_text(self, stream_name=".", portable_locators=False, + normalize=False, only_committed=False): buf = "" filestream = [] - for segment in self.segments: + for segment in self._segments: loc = segment.locator - if loc.startswith("bufferblock"): - loc = self._bufferblocks[loc].calculate_locator() + if self.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue + loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() - filestream.append(LocatorAndRange(loc, locator_block_size(loc), + filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) - buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) + buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent @@ -1003,8 +1266,8 @@ class ArvadosFileReader(ArvadosFileReaderBase): """ - def __init__(self, arvadosfile, num_retries=None): - super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries) + def __init__(self, arvadosfile, mode="r", num_retries=None): + super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): @@ -1013,6 +1276,11 @@ class ArvadosFileReader(ArvadosFileReaderBase): def stream_name(self): return self.arvadosfile.parent.stream_name() + def readinto(self, b): + data = self.read(len(b)) + b[:len(data)] = data + return len(data) + @_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): @@ -1028,7 +1296,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): data.append(rd) self._filepos += len(rd) rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) - return ''.join(data) + return b''.join(data) else: data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) @@ -1056,17 +1324,19 @@ class ArvadosFileWriter(ArvadosFileReader): """ def __init__(self, arvadosfile, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) - self.mode = mode + super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) + self.arvadosfile.add_writer(self) + + def writable(self): + return True @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": - self.arvadosfile.writeto(self.size(), data, num_retries) - else: - self.arvadosfile.writeto(self._filepos, data, num_retries) - self._filepos += len(data) + self._filepos = self.size() + self.arvadosfile.writeto(self._filepos, data, num_retries) + self._filepos += len(data) return len(data) @_FileLikeObjectBase._before_close @@ -1080,14 +1350,42 @@ class ArvadosFileWriter(ArvadosFileReader): if size is None: size = self._filepos self.arvadosfile.truncate(size) - if self._filepos > self.size(): - self._filepos = self.size() @_FileLikeObjectBase._before_close def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() + + +class WrappableFile(object): + """An interface to an Arvados file that's compatible with io wrappers. + + """ + def __init__(self, f): + self.f = f + self.closed = False + def close(self): + self.closed = True + return self.f.close() + def flush(self): + return self.f.flush() + def read(self, *args, **kwargs): + return self.f.read(*args, **kwargs) + def readable(self): + return self.f.readable() + def readinto(self, *args, **kwargs): + return self.f.readinto(*args, **kwargs) + def seek(self, *args, **kwargs): + return self.f.seek(*args, **kwargs) + def seekable(self): + return self.f.seekable() + def tell(self): + return self.f.tell() + def writable(self): + return self.f.writable() + def write(self, *args, **kwargs): + return self.f.write(*args, **kwargs)