18941: Add --threads option to arv-get
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 16:24:53 +0000 (12:24 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 16:24:53 +0000 (12:24 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/get.py
sdk/python/arvados/keep.py

index b21ebd331769109ca0cca3b23e2c45f17ed2c9ed..fbf593d02619810b1647f78520217ac6a9b4bef7 100644 (file)
@@ -479,9 +479,9 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 4
+    DEFAULT_GET_THREADS = 2
 
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -1103,7 +1103,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []
 
         locs = set()
         data = []
@@ -1121,10 +1121,7 @@ class ArvadosFile(object):
                 self.parent._my_block_manager().block_prefetch(lr.locator)
                 locs.add(lr.locator)
 
                 self.parent._my_block_manager().block_prefetch(lr.locator)
                 locs.add(lr.locator)
 
-        if len(data) == 1:
-            return data[0]
-        else:
-            return b''.join(data)
+        return b''.join(data)
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
index a076de6baf622f560f92859db68e7e8cdafc65f9..a44d42b6ac7cd7a4156ab3b8bc4f72f86060e3a0 100644 (file)
@@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase):
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
-                 put_threads=None):
+                 put_threads=None,
+                 get_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase):
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
+        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired,
+                                                get_threads=self.get_threads,)
         return self._block_manager
 
     def _remember_api_response(self, response):
         return self._block_manager
 
     def _remember_api_response(self, response):
index eb682976253b66a3c0c6c0ebd0a9dc5ca306d499..a377c149df15cc6e09ae6c2fdd56c97b5639cf40 100755 (executable)
@@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing
 it.
 """)
 
 it.
 """)
 
+parser.add_argument('--threads', type=int, metavar='N', default=2,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
@@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     try:
         reader = arvados.CollectionReader(
 
     try:
         reader = arvados.CollectionReader(
-            col_loc, api_client=api_client, num_retries=args.retries)
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
index df01c3a55b74e3e1792f1c8e0b0d1846e45e20ee..94104586deb46a4c24c05c41e683a50b28c69d1d 100644 (file)
@@ -176,7 +176,7 @@ class Keep(object):
 
 class KeepBlockCache(object):
     # Default RAM cache is 256MiB
 
 class KeepBlockCache(object):
     # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(1024 * 1024 * 1024)):
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
@@ -1337,4 +1337,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-