X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fea4d2ba4ab741daff3fd17d910b72539a50a447..0e0c1400b57d5de8aa8c18dd4897527f905a4b42:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 792c81f1e6..c6cb1c91cc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,17 +1,25 @@ +from __future__ import absolute_import +from __future__ import division +from future import standard_library +standard_library.install_aliases() +from builtins import range +from builtins import object import functools import os import zlib import bz2 -import config +from . import config import hashlib import threading -import Queue +import queue import copy import errno import re import logging +import collections +import uuid -from .errors import KeepWriteError, AssertionError +from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream from ._ranges import locators_and_ranges, replace_range, Range @@ -36,6 +44,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -68,7 +82,7 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._filepos = 0L + self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -88,7 +102,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + self._filepos = min(max(pos, 0), self.size()) def tell(self): return self._filepos @@ -108,6 +122,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -123,13 +138,14 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] @_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): - for segment in self.readall(size, num_retries): + for segment in self.readall(size, num_retries=num_retries): data = decompress(segment) if data: yield data @@ -203,8 +219,8 @@ class StreamFileReader(ArvadosFileReaderBase): if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, - lr.segment_size, - num_retries=num_retries) + lr.segment_size, + num_retries=num_retries) self._filepos += len(data) return data @@ -236,6 +252,13 @@ def synchronized(orig_func): return orig_func(self, *args, **kwargs) return synchronized_wrapper + +class StateChangeError(Exception): + def __init__(self, message, state, nextstate): + super(StateChangeError, self).__init__(message) + self.state = state + self.nextstate = nextstate + class _BufferBlock(object): """A stand-in for a Keep block that is in the process of being written. @@ -259,6 +282,7 @@ class _BufferBlock(object): WRITABLE = 0 PENDING = 1 COMMITTED = 2 + ERROR = 3 def __init__(self, blockid, starting_capacity, owner): """ @@ -280,6 +304,8 @@ class _BufferBlock(object): self._locator = None self.owner = owner self.lock = threading.Lock() + self.wait_for_commit = threading.Event() + self.error = None @synchronized def append(self, data): @@ -301,17 +327,30 @@ class _BufferBlock(object): else: raise AssertionError("Buffer block is not writable") + STATE_TRANSITIONS = frozenset([ + (WRITABLE, PENDING), + (PENDING, COMMITTED), + (PENDING, ERROR), + (ERROR, PENDING)]) + @synchronized - def set_state(self, nextstate, loc=None): - if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or - (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)): - self._state = nextstate - if self._state == _BufferBlock.COMMITTED: - self._locator = loc - self.buffer_view = None - self.buffer_block = None - else: - raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate)) + def set_state(self, nextstate, val=None): + if (self._state, nextstate) not in self.STATE_TRANSITIONS: + raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) + self._state = nextstate + + if self._state == _BufferBlock.PENDING: + self.wait_for_commit.clear() + + if self._state == _BufferBlock.COMMITTED: + self._locator = val + self.buffer_view = None + self.buffer_block = None + self.wait_for_commit.set() + + if self._state == _BufferBlock.ERROR: + self.error = val + self.wait_for_commit.set() @synchronized def state(self): @@ -331,7 +370,7 @@ class _BufferBlock(object): @synchronized def clone(self, new_blockid, owner): if self._state == _BufferBlock.COMMITTED: - raise AssertionError("Can only duplicate a writable or pending buffer block") + raise AssertionError("Cannot duplicate committed buffer block") bufferblock = _BufferBlock(new_blockid, self.size(), owner) bufferblock.append(self.buffer_view[0:self.size()]) return bufferblock @@ -361,7 +400,7 @@ def must_be_writable(orig_func): @functools.wraps(orig_func) def must_be_writable_wrapper(self, *args, **kwargs): if not self.writable(): - raise IOError(errno.EROFS, "Collection must be writable.") + raise IOError(errno.EROFS, "Collection is read-only.") return orig_func(self, *args, **kwargs) return must_be_writable_wrapper @@ -373,19 +412,28 @@ class _BlockManager(object): Collection of ArvadosFiles. """ - def __init__(self, keep): + + DEFAULT_PUT_THREADS = 2 + DEFAULT_GET_THREADS = 2 + + def __init__(self, keep, copies=None, put_threads=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None - self._put_errors = None self._put_threads = None self._prefetch_queue = None self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = 2 - self.num_get_threads = 2 + if put_threads: + self.num_put_threads = put_threads + else: + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -401,8 +449,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -427,6 +478,73 @@ class _BlockManager(object): def is_bufferblock(self, locator): return locator in self._bufferblocks + def _commit_bufferblock_worker(self): + """Background uploader thread.""" + + while True: + try: + bufferblock = self._put_queue.get() + if bufferblock is None: + return + + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) + bufferblock.set_state(_BufferBlock.COMMITTED, loc) + + except Exception as e: + bufferblock.set_state(_BufferBlock.ERROR, e) + finally: + if self._put_queue is not None: + self._put_queue.task_done() + + def start_put_threads(self): + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = queue.Queue(maxsize=2) + + self._put_threads = [] + for i in range(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() + + def _block_prefetch_worker(self): + """The background downloader thread.""" + while True: + try: + b = self._prefetch_queue.get() + if b is None: + return + self._keep.get(b) + except Exception: + _logger.exception("Exception doing block prefetch") + + @synchronized + def start_get_threads(self): + if self._prefetch_threads is None: + self._prefetch_queue = queue.Queue() + self._prefetch_threads = [] + for i in range(0, self.num_get_threads): + thread = threading.Thread(target=self._block_prefetch_worker) + self._prefetch_threads.append(thread) + thread.daemon = True + thread.start() + + @synchronized def stop_threads(self): """Shut down and wait for background upload and download threads to finish.""" @@ -438,7 +556,6 @@ class _BlockManager(object): t.join() self._put_threads = None self._put_queue = None - self._put_errors = None if self._prefetch_threads is not None: for t in self._prefetch_threads: @@ -448,71 +565,94 @@ class _BlockManager(object): self._prefetch_threads = None self._prefetch_queue = None - def commit_bufferblock(self, block, wait): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop_threads() + + @synchronized + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): + """Packs small blocks together before uploading""" + self._pending_write_size += closed_file_size + + # Check if there are enough small blocks for filling up one in full + if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + try: + small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + arvfile = bb.owner + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, + 0, + bb.size(), + new_bb.write_pointer - bb.size())]) + self._delete_bufferblock(bb.blockid) + self.commit_bufferblock(new_bb, sync=sync) + + def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. :block: The block object to upload - :wait: - If `wait` is True, upload the block synchronously. - If `wait` is False, upload the block asynchronously. This will - return immediately unless if the upload queue is at capacity, in + :sync: + If `sync` is True, upload the block synchronously. + If `sync` is False, upload the block asynchronously. This will + return immediately unless the upload queue is at capacity, in which case it will wait on an upload queue slot. """ - - def commit_bufferblock_worker(self): - """Background uploader thread.""" - - while True: - try: - bufferblock = self._put_queue.get() - if bufferblock is None: - return - - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) - bufferblock.set_state(_BufferBlock.COMMITTED, loc) - - except Exception as e: - self._put_errors.put((bufferblock.locator(), e)) - finally: - if self._put_queue is not None: - self._put_queue.task_done() - - if block.state() != _BufferBlock.WRITABLE: - return - - if wait: - block.set_state(_BufferBlock.PENDING) - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) - block.set_state(_BufferBlock.COMMITTED, loc) - else: - with self.lock: - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - self._put_errors = Queue.Queue() - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=commit_bufferblock_worker, args=(self,)) - self._put_threads.append(thread) - thread.daemon = True - thread.start() - + try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) + except StateChangeError as e: + if e.state == _BufferBlock.PENDING: + if sync: + block.wait_for_commit.wait() + else: + return + if block.state() == _BufferBlock.COMMITTED: + return + elif block.state() == _BufferBlock.ERROR: + raise block.error + else: + raise + + if sync: + try: + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) + block.set_state(_BufferBlock.COMMITTED, loc) + except Exception as e: + block.set_state(_BufferBlock.ERROR, e) + raise + else: + self.start_put_threads() self._put_queue.put(block) @synchronized @@ -521,6 +661,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -547,37 +690,35 @@ class _BlockManager(object): def commit_all(self): """Commit all outstanding buffer blocks. - Unlike commit_bufferblock(), this is a synchronous call, and will not - return until all buffer blocks are uploaded. Raises - KeepWriteError() if any blocks failed to upload. + This is a synchronous call, and will not return until all buffer blocks + are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: - items = self._bufferblocks.items() + items = list(self._bufferblocks.items()) for k,v in items: - if v.state() == _BufferBlock.WRITABLE: - v.owner.flush(False) + if v.state() != _BufferBlock.COMMITTED and v.owner: + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: self._put_queue.join() - if not self._put_errors.empty(): - err = [] - try: - while True: - err.append(self._put_errors.get(False)) - except Queue.Empty: - pass + err = [] + for k,v in items: + if v.state() == _BufferBlock.ERROR: + err.append((v.locator(), v.error)) + if err: raise KeepWriteError("Error writing some blocks", err, label="block") for k,v in items: - # flush again with wait=True to remove committed bufferblocks from + # flush again with sync=True to remove committed bufferblocks from # the segments. if v.owner: - v.owner.flush(True) - + v.owner.flush(sync=True) def block_prefetch(self, locator): """Initiate a background download of a block. @@ -592,28 +733,14 @@ class _BlockManager(object): if not self.prefetch_enabled: return - def block_prefetch_worker(self): - """The background downloader thread.""" - while True: - try: - b = self._prefetch_queue.get() - if b is None: - return - self._keep.get(b) - except Exception: - pass + if self._keep.get_from_cache(locator) is not None: + return with self.lock: if locator in self._bufferblocks: return - if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() - self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): - thread = threading.Thread(target=block_prefetch_worker, args=(self,)) - self._prefetch_threads.append(thread) - thread.daemon = True - thread.start() + + self.start_get_threads() self._prefetch_queue.put(locator) @@ -640,7 +767,8 @@ class ArvadosFile(object): """ self.parent = parent self.name = name - self._modified = True + self._writers = set() + self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: @@ -650,6 +778,14 @@ class ArvadosFile(object): def writable(self): return self.parent.writable() + @synchronized + def permission_expired(self, as_of_dt=None): + """Returns True if any of the segment's locators is expired""" + for r in self._segments: + if KeepLocator(r.locator).permission_expired(as_of_dt): + return True + return False + @synchronized def segments(self): return copy.copy(self._segments) @@ -681,7 +817,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._modified = True + self.set_committed(False) def __eq__(self, other): if other is self: @@ -693,7 +829,7 @@ class ArvadosFile(object): with self.lock: if len(self._segments) != len(othersegs): return False - for i in xrange(0, len(othersegs)): + for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator @@ -717,14 +853,55 @@ class ArvadosFile(object): return not self.__eq__(other) @synchronized - def set_unmodified(self): - """Clear the modified flag""" - self._modified = False + def set_segments(self, segs): + self._segments = segs + + @synchronized + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized - def modified(self): - """Test the modified flag""" - return self._modified + def committed(self): + """Get whether this is committed or not.""" + return self._committed + + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer, flush): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 @must_be_writable @synchronized @@ -752,7 +929,7 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._modified = True + self.set_committed(False) elif size > self.size(): raise IOError(errno.EINVAL, "truncate() does not support extending the file size") @@ -768,19 +945,25 @@ class ArvadosFile(object): with self.lock: if size == 0 or offset >= self.size(): return '' - prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE) readsegs = locators_and_ranges(self._segments, offset, size) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) - for lr in prefetch: - self.parent._my_block_manager().block_prefetch(lr.locator) - + locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: - data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size]) + blockview = memoryview(block) + data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) + locs.add(lr.locator) else: break + + for lr in prefetch: + if lr.locator not in locs: + self.parent._my_block_manager().block_prefetch(lr.locator) + locs.add(lr.locator) + return ''.join(data) def _repack_writes(self, num_retries): @@ -825,9 +1008,15 @@ class ArvadosFile(object): raise ArgumentError("Offset is past the end of the file") if len(data) > config.KEEP_BLOCK_SIZE: - raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE)) + # Chunk it up into smaller writes + n = 0 + dataview = memoryview(data) + while n < len(data): + self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) + n += config.KEEP_BLOCK_SIZE + return - self._modified = True + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -835,7 +1024,7 @@ class ArvadosFile(object): if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._repack_writes(num_retries) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) @@ -847,25 +1036,35 @@ class ArvadosFile(object): return len(data) @synchronized - def flush(self, wait=True, num_retries=0): - """Flush bufferblocks to Keep.""" - if self.modified(): - if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE: - self._repack_writes(num_retries) - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait) - if wait: - to_delete = set() - for s in self._segments: - bb = self.parent._my_block_manager().get_bufferblock(s.locator) - if bb: - if bb.state() != _BufferBlock.COMMITTED: - _logger.error("bufferblock %s is not committed" % (s.locator)) - else: - to_delete.add(s.locator) - s.locator = bb.locator() - for s in to_delete: - self.parent._my_block_manager().delete_bufferblock(s) + def flush(self, sync=True, num_retries=0): + """Flush the current bufferblock to Keep. + + :sync: + If True, commit block synchronously, wait until buffer block has been written. + If False, commit block asynchronously, return immediately after putting block into + the keep put queue. + """ + if self.committed(): + return + if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: + if self._current_bblock.state() == _BufferBlock.WRITABLE: + self._repack_writes(num_retries) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) + + if sync: + to_delete = set() + for s in self._segments: + bb = self.parent._my_block_manager().get_bufferblock(s.locator) + if bb: + if bb.state() != _BufferBlock.COMMITTED: + self.parent._my_block_manager().commit_bufferblock(bb, sync=True) + to_delete.add(s.locator) + s.locator = bb.locator() + for s in to_delete: + self.parent._my_block_manager().delete_bufferblock(s) + + self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized @@ -880,7 +1079,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._modified = True + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -896,12 +1095,15 @@ class ArvadosFile(object): return 0 @synchronized - def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + def manifest_text(self, stream_name=".", portable_locators=False, + normalize=False, only_committed=False): buf = "" filestream = [] for segment in self.segments: loc = segment.locator - if loc.startswith("bufferblock"): + if self.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() @@ -914,8 +1116,8 @@ class ArvadosFile(object): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._modified = True - self.flush() + self.set_committed(False) + self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname @@ -930,8 +1132,8 @@ class ArvadosFileReader(ArvadosFileReaderBase): """ - def __init__(self, arvadosfile, mode="r", num_retries=None): - super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries) + def __init__(self, arvadosfile, num_retries=None): + super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): @@ -983,7 +1185,9 @@ class ArvadosFileWriter(ArvadosFileReader): """ def __init__(self, arvadosfile, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries) + super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) + self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -999,7 +1203,7 @@ class ArvadosFileWriter(ArvadosFileReader): @retry_method def writelines(self, seq, num_retries=None): for s in seq: - self.write(s, num_retries) + self.write(s, num_retries=num_retries) @_FileLikeObjectBase._before_close def truncate(self, size=None): @@ -1013,7 +1217,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()