From 2d6c425e78bc5712c63b4ebecb05077b0e30da1f Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 29 Mar 2022 12:24:53 -0400 Subject: [PATCH] 18941: Add --threads option to arv-get Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/python/arvados/arvfile.py | 13 +++++-------- sdk/python/arvados/collection.py | 11 +++++++++-- sdk/python/arvados/commands/get.py | 13 ++++++++++++- sdk/python/arvados/keep.py | 3 +-- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index b21ebd3317..fbf593d026 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -479,9 +479,9 @@ class _BlockManager(object): """ DEFAULT_PUT_THREADS = 2 - DEFAULT_GET_THREADS = 4 + DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None): + def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -492,7 +492,7 @@ class _BlockManager(object): self.lock = threading.Lock() self.prefetch_enabled = True self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS - self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS self.copies = copies self.storage_classes = storage_classes_func or (lambda: []) self._pending_write_size = 0 @@ -1103,7 +1103,7 @@ class ArvadosFile(object): if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) - prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32) + prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32) locs = set() data = [] @@ -1121,10 +1121,7 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - if len(data) == 1: - return data[0] - else: - return b''.join(data) + return b''.join(data) @must_be_writable @synchronized diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index a076de6baf..a44d42b6ac 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase): block_manager=None, replication_desired=None, storage_classes_desired=None, - put_threads=None): + put_threads=None, + get_threads=None): """Collection constructor. :manifest_locator_or_text: @@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase): self.replication_desired = replication_desired self._storage_classes_desired = storage_classes_desired self.put_threads = put_threads + self.get_threads = get_threads if apiconfig: self._config = apiconfig @@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase): copies = (self.replication_desired or self._my_api()._rootDesc.get('defaultCollectionReplication', 2)) - self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired) + self._block_manager = _BlockManager(self._my_keep(), + copies=copies, + put_threads=self.put_threads, + num_retries=self.num_retries, + storage_classes_func=self.storage_classes_desired, + get_threads=self.get_threads,) return self._block_manager def _remember_api_response(self, response): diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py index eb68297625..a377c149df 100755 --- a/sdk/python/arvados/commands/get.py +++ b/sdk/python/arvados/commands/get.py @@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing it. """) +parser.add_argument('--threads', type=int, metavar='N', default=2, + help=""" +Set the number of download threads to be used. Take into account that +using lots of threads will increase the RAM requirements. Default is +to use 2 threads. +On high latency installations, using a greater number will improve +overall throughput. +""") + def parse_arguments(arguments, stdout, stderr): args = parser.parse_args(arguments) @@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): try: reader = arvados.CollectionReader( - col_loc, api_client=api_client, num_retries=args.retries) + col_loc, api_client=api_client, num_retries=args.retries, + keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)), + get_threads=args.threads) except Exception as error: logger.error("failed to read collection: {}".format(error)) return 1 diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index df01c3a55b..94104586de 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -176,7 +176,7 @@ class Keep(object): class KeepBlockCache(object): # Default RAM cache is 256MiB - def __init__(self, cache_max=(1024 * 1024 * 1024)): + def __init__(self, cache_max=(256 * 1024 * 1024)): self.cache_max = cache_max self._cache = [] self._cache_lock = threading.Lock() @@ -1337,4 +1337,3 @@ class KeepClient(object): return True if os.path.exists(os.path.join(self.local_store, locator.md5sum)): return True - -- 2.30.2