+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from __future__ import division
from future import standard_library
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
-from ._ranges import locators_and_ranges, replace_range, Range
+from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
from .retry import retry_method
MOD = "mod"
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._binary = 'b' in mode
- if sys.version_info >= (3, 0) and not self._binary:
- raise NotImplementedError("text mode {!r} is not implemented".format(mode))
self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
PENDING = 1
COMMITTED = 2
ERROR = 3
+ DELETED = 4
def __init__(self, blockid, starting_capacity, owner):
"""
@synchronized
def clear(self):
+ self._state = _BufferBlock.DELETED
self.owner = None
self.buffer_block = None
self.buffer_view = None
+ @synchronized
+ def repack_writes(self):
+ """Optimize buffer block by repacking segments in file sequence.
+
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
+
+ """
+ if self._state != _BufferBlock.WRITABLE:
+ raise AssertionError("Cannot repack non-writable block")
+
+ segs = self.owner.segments()
+
+ # Collect the segments that reference the buffer block.
+ bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
+ write_total = sum([s.range_size for s in bufferblock_segs])
+
+ if write_total < self.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
+ contents = self.buffer_view[0:self.write_pointer].tobytes()
+ new_bb = _BufferBlock(None, write_total, None)
+ for t in bufferblock_segs:
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+ t.segment_offset = new_bb.size() - t.range_size
+
+ self.buffer_block = new_bb.buffer_block
+ self.buffer_view = new_bb.buffer_view
+ self.write_pointer = new_bb.write_pointer
+ self._locator = None
+ new_bb.clear()
+ self.owner.set_segments(segs)
+
+ def __repr__(self):
+ return "<BufferBlock %s>" % (self.blockid)
+
class NoopLock(object):
def __enter__(self):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self.num_retries = num_retries
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
- blockid = "%s" % uuid.uuid4()
+ blockid = str(uuid.uuid4())
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
ArvadosFile that owns the new block
"""
- new_blockid = "bufferblock%i" % len(self._bufferblocks)
+ new_blockid = str(uuid.uuid4())
bufferblock = block.clone(new_blockid, owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
finally:
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
+ # generating data more quickly than it can be sent to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
@synchronized
def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
+
self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- try:
- small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- except AttributeError:
- # Writable blocks without owner shouldn't exist.
- raise UnownedBlockError()
+ # Search blocks ready for getting packed together before being
+ # committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
+ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ for bb in small_blocks:
+ bb.repack_writes()
- # Update the pending write size count with its true value, just in case
- # some small file was opened, written and closed several times.
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
- return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
- new_bb = self._alloc_bufferblock()
- while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
- bb = small_blocks.pop(0)
- arvfile = bb.owner
- self._pending_write_size -= bb.size()
- new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- arvfile.set_segments([Range(new_bb.blockid,
- 0,
- bb.size(),
- new_bb.write_pointer - bb.size())])
- self._delete_bufferblock(bb.blockid)
- self.commit_bufferblock(new_bb, sync=sync)
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.blockid
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
- v.owner.flush(sync=False)
+ # Ignore blocks with a list of owners, as if they're not in COMMITTED
+ # state, they're already being committed asynchronously.
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=False)
with self.lock:
if self._put_queue is not None:
# flush again with sync=True to remove committed bufferblocks from
# the segments.
if v.owner:
- v.owner.flush(sync=True)
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=True)
+ elif isinstance(v.owner, list) and len(v.owner) > 0:
+ # This bufferblock is referenced by many files as a result
+ # of repacking small blocks, so don't delete it when flushing
+ # its owners, just do it after flushing them all.
+ for owner in v.owner:
+ owner.flush(sync=True)
+ self.delete_bufferblock(k)
def block_prefetch(self, locator):
"""Initiate a background download of a block.
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
"""
+ __slots__ = ('parent', 'name', '_writers', '_committed',
+ '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
return True
return False
+ @synchronized
+ def has_remote_blocks(self):
+ """Returns True if any of the segment's locators has a +R signature"""
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ return True
+ return False
+
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Ask Keep to copy remote blocks and point to their local copies.
+
+ This is called from the parent Collection.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+ """
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self.parent._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.parent.set_committed(False)
+ return remote_blocks
+
@synchronized
def segments(self):
return copy.copy(self._segments)
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
locs = set()
data = []
return b''.join(data)
- def _repack_writes(self, num_retries):
- """Optimize buffer block by repacking segments in file sequence.
-
- When the client makes random writes, they appear in the buffer block in
- the sequence they were written rather than the sequence they appear in
- the file. This makes for inefficient, fragmented manifests. Attempt
- to optimize by repacking writes in file sequence.
-
- """
- segs = self._segments
-
- # Collect the segments that reference the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
- # Collect total data referenced by segments (could be smaller than
- # bufferblock size if a portion of the file was written and
- # then overwritten).
- write_total = sum([s.range_size for s in bufferblock_segs])
-
- if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
- # If there's more than one segment referencing this block, it is
- # due to out-of-order writes and will produce a fragmented
- # manifest, so try to optimize by re-packing into a new buffer.
- contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
- new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
- for t in bufferblock_segs:
- new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
- t.segment_offset = new_bb.size() - t.range_size
-
- self._current_bblock = new_bb
-
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
if self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes(num_retries)
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+ self._current_bblock.repack_writes()
+ if self._current_bblock.state() != _BufferBlock.DELETED:
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
if sync:
to_delete = set()
to_delete.add(s.locator)
s.locator = bb.locator()
for s in to_delete:
- self.parent._my_block_manager().delete_bufferblock(s)
+ # Don't delete the bufferblock if it's owned by many files. It'll be
+ # deleted after all of its owners are flush()ed.
+ if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+ self.parent._my_block_manager().delete_bufferblock(s)
self.parent.notify(MOD, self.parent, self.name, (self, self))
normalize=False, only_committed=False):
buf = ""
filestream = []
- for segment in self.segments:
+ for segment in self._segments:
loc = segment.locator
if self.parent._my_block_manager().is_bufferblock(loc):
if only_committed:
continue
- loc = self._bufferblocks[loc].calculate_locator()
+ loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
- filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+ filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
segment.segment_offset, segment.range_size))
- buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+ buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
buf += "\n"
return buf
def stream_name(self):
return self.arvadosfile.parent.stream_name()
+ def readinto(self, b):
+ data = self.read(len(b))
+ b[:len(data)] = data
+ return len(data)
+
@_FileLikeObjectBase._before_close
@retry_method
def read(self, size=None, num_retries=None):
if not self.closed:
self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+ """An interface to an Arvados file that's compatible with io wrappers.
+
+ """
+ def __init__(self, f):
+ self.f = f
+ self.closed = False
+ def close(self):
+ self.closed = True
+ return self.f.close()
+ def flush(self):
+ return self.f.flush()
+ def read(self, *args, **kwargs):
+ return self.f.read(*args, **kwargs)
+ def readable(self):
+ return self.f.readable()
+ def readinto(self, *args, **kwargs):
+ return self.f.readinto(*args, **kwargs)
+ def seek(self, *args, **kwargs):
+ return self.f.seek(*args, **kwargs)
+ def seekable(self):
+ return self.f.seekable()
+ def tell(self):
+ return self.f.tell()
+ def writable(self):
+ return self.f.writable()
+ def write(self, *args, **kwargs):
+ return self.f.write(*args, **kwargs)