X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d094ae4ee58f26e0585445eccb3be2d019ab020f..1d9e4de7a4ff994cfc7a9319dcae56bb26c272b3:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index f00936d743..6893b94bf7 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + from __future__ import absolute_import from __future__ import division from future import standard_library @@ -84,9 +88,6 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._binary = 'b' in mode - if sys.version_info >= (3, 0) and not self._binary: - raise NotImplementedError("text mode {!r} is not implemented".format(mode)) self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -480,7 +481,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None): + def __init__(self, keep, copies=None, put_threads=None, num_retries=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -499,8 +500,7 @@ class _BlockManager(object): self._pending_write_size = 0 self.threads_lock = threading.Lock() self.padding_block = None - self._repacked_bb = {} - self._repacked_bb_lock = threading.Lock() + self.num_retries = num_retries @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -555,21 +555,10 @@ class _BlockManager(object): return if self.copies is None: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries) else: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) - - with self._repacked_bb_lock: - # Check if this block was created by repacking smaller blocks - if bufferblock.blockid in self._repacked_bb: - # Update segment locators (with its tokens) of files within - # this block - old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc'] - for f in self._repacked_bb[bufferblock.blockid]['files']: - for s in [x for x in f._segments if x.locator == old_loc]: - s.locator = loc - del(self._repacked_bb[bufferblock.blockid]) except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) finally: @@ -685,34 +674,25 @@ class _BlockManager(object): return new_bb = self._alloc_bufferblock() + new_bb.owner = [] files = [] while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) + new_bb.owner.append(bb.owner) self._pending_write_size -= bb.size() new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) files.append((bb, new_bb.write_pointer - bb.size())) - # If this repacked block will be committed asynchronously, take note - # of its files so their segments' locators will be updated with - # the correct permission token returned by the API server. - if not sync: - with self._repacked_bb_lock: - self._repacked_bb[new_bb.blockid] = { - 'unsigned_loc': new_bb.locator(), - 'files': [bb.owner for bb, _ in files], - } - self.commit_bufferblock(new_bb, sync=sync) - with self._repacked_bb_lock: - for bb, new_bb_segment_offset in files: - newsegs = bb.owner.segments() - for s in newsegs: - if s.locator == bb.blockid: - s.locator = new_bb.locator() - s.segment_offset = new_bb_segment_offset+s.segment_offset - bb.owner.set_segments(newsegs) - self._delete_bufferblock(bb.blockid) + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.blockid + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -746,9 +726,9 @@ class _BlockManager(object): if sync: try: if self.copies is None: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries) else: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -819,7 +799,10 @@ class _BlockManager(object): for k,v in items: if v.state() != _BufferBlock.COMMITTED and v.owner: - v.owner.flush(sync=False) + # Ignore blocks with a list of owners, as if they're not in COMMITTED + # state, they're already being committed asynchronously. + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: @@ -836,7 +819,15 @@ class _BlockManager(object): # flush again with sync=True to remove committed bufferblocks from # the segments. if v.owner: - v.owner.flush(sync=True) + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=True) + elif isinstance(v.owner, list) and len(v.owner) > 0: + # This bufferblock is referenced by many files as a result + # of repacking small blocks, so don't delete it when flushing + # its owners, just do it after flushing them all. + for owner in v.owner: + owner.flush(sync=True) + self.delete_bufferblock(k) def block_prefetch(self, locator): """Initiate a background download of a block. @@ -873,6 +864,9 @@ class ArvadosFile(object): """ + __slots__ = ('parent', 'name', '_writers', '_committed', + '_segments', 'lock', '_current_bblock', 'fuse_entry') + def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. @@ -904,6 +898,38 @@ class ArvadosFile(object): return True return False + @synchronized + def has_remote_blocks(self): + """Returns True if any of the segment's locators has a +R signature""" + + for s in self._segments: + if '+R' in s.locator: + return True + return False + + @synchronized + def _copy_remote_blocks(self, remote_blocks={}): + """Ask Keep to copy remote blocks and point to their local copies. + + This is called from the parent Collection. + + :remote_blocks: + Shared cache of remote to local block mappings. This is used to avoid + doing extra work when blocks are shared by more than one file in + different subdirectories. + """ + + for s in self._segments: + if '+R' in s.locator: + try: + loc = remote_blocks[s.locator] + except KeyError: + loc = self.parent._my_keep().refresh_signature(s.locator) + remote_blocks[s.locator] = loc + s.locator = loc + self.parent.set_committed(False) + return remote_blocks + @synchronized def segments(self): return copy.copy(self._segments) @@ -1167,7 +1193,10 @@ class ArvadosFile(object): to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: - self.parent._my_block_manager().delete_bufferblock(s) + # Don't delete the bufferblock if it's owned by many files. It'll be + # deleted after all of its owners are flush()ed. + if self.parent._my_block_manager().get_bufferblock(s).owner is self: + self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @@ -1247,6 +1276,11 @@ class ArvadosFileReader(ArvadosFileReaderBase): def stream_name(self): return self.arvadosfile.parent.stream_name() + def readinto(self, b): + data = self.read(len(b)) + b[:len(data)] = data + return len(data) + @_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): @@ -1325,3 +1359,33 @@ class ArvadosFileWriter(ArvadosFileReader): if not self.closed: self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() + + +class WrappableFile(object): + """An interface to an Arvados file that's compatible with io wrappers. + + """ + def __init__(self, f): + self.f = f + self.closed = False + def close(self): + self.closed = True + return self.f.close() + def flush(self): + return self.f.flush() + def read(self, *args, **kwargs): + return self.f.read(*args, **kwargs) + def readable(self): + return self.f.readable() + def readinto(self, *args, **kwargs): + return self.f.readinto(*args, **kwargs) + def seek(self, *args, **kwargs): + return self.f.seek(*args, **kwargs) + def seekable(self): + return self.f.seekable() + def tell(self): + return self.f.tell() + def writable(self): + return self.f.writable() + def write(self, *args, **kwargs): + return self.f.write(*args, **kwargs)