X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2656053035f46e6097b380f5c0da7d54b04a60c4..dfd3260e8b32ac4b9640e3e33bf8cf1e581bd917:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index c394dab810..9db19b05f6 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -38,6 +38,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -404,7 +410,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None): + def __init__(self, keep, copies=None, put_threads=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -414,9 +420,14 @@ class _BlockManager(object): self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + if put_threads: + self.num_put_threads = put_threads + else: + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -482,28 +493,28 @@ class _BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - @synchronized def start_put_threads(self): - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=self._commit_bufferblock_worker) - self._put_threads.append(thread) - thread.daemon = True - thread.start() + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() def _block_prefetch_worker(self): """The background downloader thread.""" @@ -514,7 +525,7 @@ class _BlockManager(object): return self._keep.get(b) except Exception: - pass + _logger.exception("Exception doing block prefetch") @synchronized def start_get_threads(self): @@ -555,24 +566,38 @@ class _BlockManager(object): self.stop_threads() @synchronized - def repack_small_blocks(self, force=False, sync=False): + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): """Packs small blocks together before uploading""" - # Search blocks ready for getting packed together before being committed to Keep. - # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's - # size is <= KEEP_BLOCK_SIZE/2. - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + self._pending_write_size += closed_file_size # Check if there are enough small blocks for filling up one in full - pending_write_size = sum([b.size() for b in small_blocks]) - if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): + if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + try: + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + new_bb = self._alloc_bufferblock() while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) arvfile = bb.owner + self._pending_write_size -= bb.size() new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) arvfile.set_segments([Range(new_bb.blockid, 0, @@ -747,6 +772,14 @@ class ArvadosFile(object): def writable(self): return self.parent.writable() + @synchronized + def permission_expired(self, as_of_dt=None): + """Returns True if any of the segment's locators is expired""" + for r in self._segments: + if KeepLocator(r.locator).permission_expired(as_of_dt): + return True + return False + @synchronized def segments(self): return copy.copy(self._segments) @@ -778,7 +811,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._committed = False + self.set_committed(False) def __eq__(self, other): if other is self: @@ -818,9 +851,18 @@ class ArvadosFile(object): self._segments = segs @synchronized - def set_committed(self): - """Set committed flag to True""" - self._committed = True + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized def committed(self): @@ -846,7 +888,7 @@ class ArvadosFile(object): self.flush() elif self.closed(): # All writers closed and size is adequate for repacking - self.parent._my_block_manager().repack_small_blocks() + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ @@ -881,7 +923,7 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._committed = False + self.set_committed(False) elif size > self.size(): raise IOError(errno.EINVAL, "truncate() does not support extending the file size") @@ -968,7 +1010,7 @@ class ArvadosFile(object): n += config.KEEP_BLOCK_SIZE return - self._committed = False + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1031,7 +1073,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._committed = False + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -1047,12 +1089,15 @@ class ArvadosFile(object): return 0 @synchronized - def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + def manifest_text(self, stream_name=".", portable_locators=False, + normalize=False, only_committed=False): buf = "" filestream = [] for segment in self.segments: loc = segment.locator - if loc.startswith("bufferblock"): + if self.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() @@ -1065,7 +1110,7 @@ class ArvadosFile(object): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent