X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc8174aca759773fa661bd9172f877b2d639576b..150de14b0b265d86df11a04201320944d04fe3a5:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 6893b94bf7..2ce0e46b30 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -481,7 +481,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None, num_retries=None): + def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -491,12 +491,10 @@ class _BlockManager(object): self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - if put_threads: - self.num_put_threads = put_threads - else: - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS - self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS + self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS self.copies = copies + self.storage_classes = storage_classes_func or (lambda: []) self._pending_write_size = 0 self.threads_lock = threading.Lock() self.padding_block = None @@ -555,9 +553,9 @@ class _BlockManager(object): return if self.copies is None: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) @@ -572,7 +570,7 @@ class _BlockManager(object): # If we don't limit the Queue size, the upload queue can quickly # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep + # generating data more quickly than it can be sent to the Keep # servers. # # With two upload threads and a queue size of 2, this means up to 4 @@ -595,7 +593,7 @@ class _BlockManager(object): b = self._prefetch_queue.get() if b is None: return - self._keep.get(b) + self._keep.get(b, prefetch=True) except Exception: _logger.exception("Exception doing block prefetch") @@ -726,9 +724,9 @@ class _BlockManager(object): if sync: try: if self.copies is None: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) else: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -762,9 +760,10 @@ class _BlockManager(object): self._delete_bufferblock(locator) def _delete_bufferblock(self, locator): - bb = self._bufferblocks[locator] - bb.clear() - del self._bufferblocks[locator] + if locator in self._bufferblocks: + bb = self._bufferblocks[locator] + bb.clear() + del self._bufferblocks[locator] def get_block_contents(self, locator, num_retries, cache_only=False): """Fetch a block. @@ -842,9 +841,6 @@ class _BlockManager(object): if not self.prefetch_enabled: return - if self._keep.get_from_cache(locator) is not None: - return - with self.lock: if locator in self._bufferblocks: return @@ -1100,7 +1096,7 @@ class ArvadosFile(object): if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) - prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32) locs = set() data = []