X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f2388f1bdad27efd2816533aa7da80735ed5ec3f..5c260a4bfcce9f967dc1518bd52aaaa6d6335c60:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 0fcdc1e633..4b95835aac 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -100,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): yield data def decompressed_name(self): - return re.sub('\.(bz2|gz)$', '', self.name) + return re.sub(r'\.(bz2|gz)$', '', self.name) @_FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_SET): @@ -479,20 +479,20 @@ class _BlockManager(object): """ DEFAULT_PUT_THREADS = 2 - DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None): + def __init__(self, keep, + copies=None, + put_threads=None, + num_retries=None, + storage_classes_func=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None - self._prefetch_queue = None - self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS - self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies self.storage_classes = storage_classes_func or (lambda: []) self._pending_write_size = 0 @@ -586,29 +586,6 @@ class _BlockManager(object): thread.daemon = True thread.start() - def _block_prefetch_worker(self): - """The background downloader thread.""" - while True: - try: - b = self._prefetch_queue.get() - if b is None: - return - self._keep.get(b) - except Exception: - _logger.exception("Exception doing block prefetch") - - @synchronized - def start_get_threads(self): - if self._prefetch_threads is None: - self._prefetch_queue = queue.Queue() - self._prefetch_threads = [] - for i in range(0, self.num_get_threads): - thread = threading.Thread(target=self._block_prefetch_worker) - self._prefetch_threads.append(thread) - thread.daemon = True - thread.start() - - @synchronized def stop_threads(self): """Shut down and wait for background upload and download threads to finish.""" @@ -621,14 +598,6 @@ class _BlockManager(object): self._put_threads = None self._put_queue = None - if self._prefetch_threads is not None: - for t in self._prefetch_threads: - self._prefetch_queue.put(None) - for t in self._prefetch_threads: - t.join() - self._prefetch_threads = None - self._prefetch_queue = None - def __enter__(self): return self @@ -828,28 +797,20 @@ class _BlockManager(object): owner.flush(sync=True) self.delete_bufferblock(k) + self.stop_threads() + def block_prefetch(self, locator): """Initiate a background download of a block. - - This assumes that the underlying KeepClient implements a block cache, - so repeated requests for the same block will not result in repeated - downloads (unless the block is evicted from the cache.) This method - does not block. - """ if not self.prefetch_enabled: return - if self._keep.get_from_cache(locator) is not None: - return - with self.lock: if locator in self._bufferblocks: return - self.start_get_threads() - self._prefetch_queue.put(locator) + self._keep.block_prefetch(locator) class ArvadosFile(object): @@ -1099,7 +1060,7 @@ class ArvadosFile(object): if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) - prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32) locs = set() data = []