X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8e6cd14b7884a691a110110b0f366577437c6d9e..0e0c1400b57d5de8aa8c18dd4897527f905a4b42:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 4cc2591ebb..c6cb1c91cc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,11 +1,17 @@ +from __future__ import absolute_import +from __future__ import division +from future import standard_library +standard_library.install_aliases() +from builtins import range +from builtins import object import functools import os import zlib import bz2 -import config +from . import config import hashlib import threading -import Queue +import queue import copy import errno import re @@ -38,6 +44,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -70,7 +82,7 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._filepos = 0L + self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -90,7 +102,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + self._filepos = min(max(pos, 0), self.size()) def tell(self): return self._filepos @@ -207,8 +219,8 @@ class StreamFileReader(ArvadosFileReaderBase): if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, - lr.segment_size, - num_retries=num_retries) + lr.segment_size, + num_retries=num_retries) self._filepos += len(data) return data @@ -404,7 +416,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None): + def __init__(self, keep, copies=None, put_threads=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -414,7 +426,10 @@ class _BlockManager(object): self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + if put_threads: + self.num_put_threads = put_threads + else: + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies self._pending_write_size = 0 @@ -498,10 +513,10 @@ class _BlockManager(object): # blocks pending. If they are full 64 MiB blocks, that means up to # 256 MiB of internal buffering, which is the same size as the # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) + self._put_queue = queue.Queue(maxsize=2) self._put_threads = [] - for i in xrange(0, self.num_put_threads): + for i in range(0, self.num_put_threads): thread = threading.Thread(target=self._commit_bufferblock_worker) self._put_threads.append(thread) thread.daemon = True @@ -521,9 +536,9 @@ class _BlockManager(object): @synchronized def start_get_threads(self): if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() + self._prefetch_queue = queue.Queue() self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): + for i in range(0, self.num_get_threads): thread = threading.Thread(target=self._block_prefetch_worker) self._prefetch_threads.append(thread) thread.daemon = True @@ -568,7 +583,11 @@ class _BlockManager(object): # A WRITABLE block always has an owner. # A WRITABLE block with its owner.closed() implies that it's # size is <= KEEP_BLOCK_SIZE/2. - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + try: + small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() if len(small_blocks) <= 1: # Not enough small blocks for repacking @@ -678,7 +697,7 @@ class _BlockManager(object): self.repack_small_blocks(force=True, sync=True) with self.lock: - items = self._bufferblocks.items() + items = list(self._bufferblocks.items()) for k,v in items: if v.state() != _BufferBlock.COMMITTED and v.owner: @@ -798,7 +817,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._committed = False + self.set_committed(False) def __eq__(self, other): if other is self: @@ -810,7 +829,7 @@ class ArvadosFile(object): with self.lock: if len(self._segments) != len(othersegs): return False - for i in xrange(0, len(othersegs)): + for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator @@ -838,9 +857,18 @@ class ArvadosFile(object): self._segments = segs @synchronized - def set_committed(self): - """Set committed flag to True""" - self._committed = True + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized def committed(self): @@ -861,7 +889,7 @@ class ArvadosFile(object): """ self._writers.remove(writer) - if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): @@ -901,7 +929,7 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._committed = False + self.set_committed(False) elif size > self.size(): raise IOError(errno.EINVAL, "truncate() does not support extending the file size") @@ -988,7 +1016,7 @@ class ArvadosFile(object): n += config.KEEP_BLOCK_SIZE return - self._committed = False + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1051,7 +1079,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._committed = False + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -1067,12 +1095,15 @@ class ArvadosFile(object): return 0 @synchronized - def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + def manifest_text(self, stream_name=".", portable_locators=False, + normalize=False, only_committed=False): buf = "" filestream = [] for segment in self.segments: loc = segment.locator - if loc.startswith("bufferblock"): + if self.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() @@ -1085,7 +1116,7 @@ class ArvadosFile(object): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent