X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/664cbad59d3fff30efb7e19c73fa57120a7672b0..1f54a6762575e1ff8da9861277dd8e44a7e87caf:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 3281d78e20..e0e972b5c1 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -88,9 +88,6 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._binary = 'b' in mode - if sys.version_info >= (3, 0) and not self._binary: - raise NotImplementedError("text mode {!r} is not implemented".format(mode)) self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -103,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): yield data def decompressed_name(self): - return re.sub('\.(bz2|gz)$', '', self.name) + return re.sub(r'\.(bz2|gz)$', '', self.name) @_FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_SET): @@ -482,27 +479,26 @@ class _BlockManager(object): """ DEFAULT_PUT_THREADS = 2 - DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None): + def __init__(self, keep, + copies=None, + put_threads=None, + num_retries=None, + storage_classes_func=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None - self._prefetch_queue = None - self._prefetch_threads = None self.lock = threading.Lock() - self.prefetch_enabled = True - if put_threads: - self.num_put_threads = put_threads - else: - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS - self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.prefetch_lookahead = self._keep.num_prefetch_threads + self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS self.copies = copies + self.storage_classes = storage_classes_func or (lambda: []) self._pending_write_size = 0 self.threads_lock = threading.Lock() self.padding_block = None + self.num_retries = num_retries @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -557,9 +553,9 @@ class _BlockManager(object): return if self.copies is None: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) @@ -574,7 +570,7 @@ class _BlockManager(object): # If we don't limit the Queue size, the upload queue can quickly # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep + # generating data more quickly than it can be sent to the Keep # servers. # # With two upload threads and a queue size of 2, this means up to 4 @@ -590,29 +586,6 @@ class _BlockManager(object): thread.daemon = True thread.start() - def _block_prefetch_worker(self): - """The background downloader thread.""" - while True: - try: - b = self._prefetch_queue.get() - if b is None: - return - self._keep.get(b) - except Exception: - _logger.exception("Exception doing block prefetch") - - @synchronized - def start_get_threads(self): - if self._prefetch_threads is None: - self._prefetch_queue = queue.Queue() - self._prefetch_threads = [] - for i in range(0, self.num_get_threads): - thread = threading.Thread(target=self._block_prefetch_worker) - self._prefetch_threads.append(thread) - thread.daemon = True - thread.start() - - @synchronized def stop_threads(self): """Shut down and wait for background upload and download threads to finish.""" @@ -625,14 +598,6 @@ class _BlockManager(object): self._put_threads = None self._put_queue = None - if self._prefetch_threads is not None: - for t in self._prefetch_threads: - self._prefetch_queue.put(None) - for t in self._prefetch_threads: - t.join() - self._prefetch_threads = None - self._prefetch_queue = None - def __enter__(self): return self @@ -728,9 +693,9 @@ class _BlockManager(object): if sync: try: if self.copies is None: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -764,9 +729,10 @@ class _BlockManager(object): self._delete_bufferblock(locator) def _delete_bufferblock(self, locator): - bb = self._bufferblocks[locator] - bb.clear() - del self._bufferblocks[locator] + if locator in self._bufferblocks: + bb = self._bufferblocks[locator] + bb.clear() + del self._bufferblocks[locator] def get_block_contents(self, locator, num_retries, cache_only=False): """Fetch a block. @@ -831,28 +797,20 @@ class _BlockManager(object): owner.flush(sync=True) self.delete_bufferblock(k) + self.stop_threads() + def block_prefetch(self, locator): """Initiate a background download of a block. - - This assumes that the underlying KeepClient implements a block cache, - so repeated requests for the same block will not result in repeated - downloads (unless the block is evicted from the cache.) This method - does not block. - """ - if not self.prefetch_enabled: - return - - if self._keep.get_from_cache(locator) is not None: + if not self.prefetch_lookahead: return with self.lock: if locator in self._bufferblocks: return - self.start_get_threads() - self._prefetch_queue.put(locator) + self._keep.block_prefetch(locator) class ArvadosFile(object): @@ -867,7 +825,7 @@ class ArvadosFile(object): """ __slots__ = ('parent', 'name', '_writers', '_committed', - '_segments', 'lock', '_current_bblock', 'fuse_entry') + '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') def __init__(self, parent, name, stream=[], segments=[]): """ @@ -888,6 +846,7 @@ class ArvadosFile(object): for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None + self._read_counter = 0 def writable(self): return self.parent.writable() @@ -1102,7 +1061,25 @@ class ArvadosFile(object): if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) - prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) + + prefetch = None + prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead + if prefetch_lookahead: + # Doing prefetch on every read() call is surprisingly expensive + # when we're trying to deliver data at 600+ MiBps and want + # the read() fast path to be as lightweight as possible. + # + # Only prefetching every 128 read operations + # dramatically reduces the overhead while still + # getting the benefit of prefetching (e.g. when + # reading 128 KiB at a time, it checks for prefetch + # every 16 MiB). + self._read_counter = (self._read_counter+1) % 128 + if self._read_counter == 1: + prefetch = locators_and_ranges(self._segments, + offset + size, + config.KEEP_BLOCK_SIZE * prefetch_lookahead, + limit=(1+prefetch_lookahead)) locs = set() data = [] @@ -1110,17 +1087,21 @@ class ArvadosFile(object): block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) - data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) + data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) locs.add(lr.locator) else: break - for lr in prefetch: - if lr.locator not in locs: - self.parent._my_block_manager().block_prefetch(lr.locator) - locs.add(lr.locator) + if prefetch: + for lr in prefetch: + if lr.locator not in locs: + self.parent._my_block_manager().block_prefetch(lr.locator) + locs.add(lr.locator) - return b''.join(data) + if len(data) == 1: + return data[0] + else: + return b''.join(data) @must_be_writable @synchronized @@ -1278,6 +1259,11 @@ class ArvadosFileReader(ArvadosFileReaderBase): def stream_name(self): return self.arvadosfile.parent.stream_name() + def readinto(self, b): + data = self.read(len(b)) + b[:len(data)] = data + return len(data) + @_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): @@ -1356,3 +1342,33 @@ class ArvadosFileWriter(ArvadosFileReader): if not self.closed: self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() + + +class WrappableFile(object): + """An interface to an Arvados file that's compatible with io wrappers. + + """ + def __init__(self, f): + self.f = f + self.closed = False + def close(self): + self.closed = True + return self.f.close() + def flush(self): + return self.f.flush() + def read(self, *args, **kwargs): + return self.f.read(*args, **kwargs) + def readable(self): + return self.f.readable() + def readinto(self, *args, **kwargs): + return self.f.readinto(*args, **kwargs) + def seek(self, *args, **kwargs): + return self.f.seek(*args, **kwargs) + def seekable(self): + return self.f.seekable() + def tell(self): + return self.f.tell() + def writable(self): + return self.f.writable() + def write(self, *args, **kwargs): + return self.f.write(*args, **kwargs)