Merge branch '18941-arv-prefetch' refs #18941
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 30 Mar 2022 19:30:24 +0000 (15:30 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 30 Mar 2022 19:30:24 +0000 (15:30 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

doc/sdk/cli/subcommands.html.textile.liquid
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/get.py
sdk/python/arvados/keep.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_mount.py

index 50d5d89871a612b368c0c46e966ed5717faa7a6a..5dda77ab5ee65cdf3700be3404f53455c0c25f28 100644 (file)
@@ -212,9 +212,10 @@ This is a frontend to @arv-get@.
 <notextile>
 <pre>
 $ <code class="userinput">arv keep get --help</code>
-usage: arv-get [-h] [--retries RETRIES]
+usage: arv-get [-h] [--retries RETRIES] [--version]
                [--progress | --no-progress | --batch-progress]
-               [--hash HASH | --md5sum] [-n] [-r] [-f | --skip-existing]
+               [--hash HASH | --md5sum] [-n] [-r]
+               [-f | -v | --skip-existing | --strip-manifest] [--threads N]
                locator [destination]
 
 Copy data from Keep to a local file or pipe.
@@ -223,19 +224,20 @@ positional arguments:
   locator            Collection locator, optionally with a file path or
                      prefix.
   destination        Local file or directory where the data is to be written.
-                     Default: /dev/stdout.
+                     Default: stdout.
 
 optional arguments:
   -h, --help         show this help message and exit
   --retries RETRIES  Maximum number of times to retry server requests that
-                     encounter temporary failures (e.g., server down). Default
-                     3.
+                     encounter temporary failures (e.g., server down).
+                     Default 3.
+  --version          Print version and exit.
   --progress         Display human-readable progress on stderr (bytes and, if
                      possible, percentage of total data size). This is the
                      default behavior when it is not expected to interfere
                      with the output: specifically, stderr is a tty _and_
-                     either stdout is not a tty, or output is being written to
-                     named files rather than stdout.
+                     either stdout is not a tty, or output is being written
+                     to named files rather than stdout.
   --no-progress      Do not display human-readable progress on stderr.
   --batch-progress   Display machine-readable progress on stderr (bytes and,
                      if known, total data size).
@@ -252,11 +254,19 @@ optional arguments:
   -f                 Overwrite existing files while writing. The default
                      behavior is to refuse to write *anything* if any of the
                      output files already exist. As a special case, -f is not
-                     needed to write to /dev/stdout.
-  --skip-existing    Skip files that already exist. The default behavior is to
-                     refuse to write *anything* if any files exist that would
-                     have to be overwritten. This option causes even devices,
-                     sockets, and fifos to be skipped.
+                     needed to write to stdout.
+  -v                 Once for verbose mode, twice for debug mode.
+  --skip-existing    Skip files that already exist. The default behavior is
+                     to refuse to write *anything* if any files exist that
+                     would have to be overwritten. This option causes even
+                     devices, sockets, and fifos to be skipped.
+  --strip-manifest   When getting a collection manifest, strip its access
+                     tokens before writing it.
+  --threads N        Set the number of download threads to be used. Take into
+                     account that using lots of threads will increase the RAM
+                     requirements. Default is to use 4 threads. On high
+                     latency installations, using a greater number will
+                     improve overall throughput.
 </pre>
 </notextile>
 
index 0fcdc1e6334957f27a5ff1f10fbdedcf2716609a..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -593,7 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                self._keep.get(b)
+                self._keep.get(b, prefetch=True)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -841,9 +841,6 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
@@ -1099,7 +1096,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []
index a076de6baf622f560f92859db68e7e8cdafc65f9..a44d42b6ac7cd7a4156ab3b8bc4f72f86060e3a0 100644 (file)
@@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase):
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
-                 put_threads=None):
+                 put_threads=None,
+                 get_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase):
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
+        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
@@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired,
+                                                get_threads=self.get_threads,)
         return self._block_manager
 
     def _remember_api_response(self, response):
index eb682976253b66a3c0c6c0ebd0a9dc5ca306d499..c061c70f0eebbac2ed2025fdecd27865c27139b8 100755 (executable)
@@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing
 it.
 """)
 
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
@@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     try:
         reader = arvados.CollectionReader(
-            col_loc, api_client=api_client, num_retries=args.retries)
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
index 1a83eae944c59f8dde5e3a7c63de8bbe9c62a9c9..7c05cc0a6a2c72ca818686b6eea5c6f0a4874d3d 100644 (file)
@@ -1036,9 +1036,10 @@ class KeepClient(object):
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -1057,7 +1058,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1096,6 +1097,13 @@ class KeepClient(object):
             if method == "GET":
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
+                    if prefetch:
+                        # this is request for a prefetch, if it is
+                        # already in flight, return immediately.
+                        # clear 'slot' to prevent finally block from
+                        # calling slot.set()
+                        slot = None
+                        return None
                     self.hits_counter.add(1)
                     blob = slot.get()
                     if blob is None:
@@ -1332,6 +1340,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)
index 0b8e7c8f8bf2d4615209e0dbdbfd81e6e54b32af..b45a592ecd0fbd1b1d4722bd63f6e1e0b25514dd 100644 (file)
@@ -27,7 +27,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
-        def get(self, locator, num_retries=0):
+        def get(self, locator, num_retries=0, prefetch=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def get_from_cache(self, locator):
@@ -627,6 +627,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             def __init__(self, blocks, nocache):
                 self.blocks = blocks
                 self.nocache = nocache
+                self.num_get_threads = 1
 
             def block_prefetch(self, loc):
                 pass
index a43e0d40dfe7ed48f5477689d3623afefe952ba3..5cf4993b2f3804d22209ae16db41fc7bc505efd8 100644 (file)
@@ -320,7 +320,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         def __init__(self, content, num_retries=0):
             self.content = content
 
-        def get(self, locator, num_retries=0):
+        def get(self, locator, num_retries=0, prefetch=False):
             return self.content[locator]
 
     def test_stream_reader(self):
index 7de95a0cb1b0d95bd1d67dcc58b5a3c406a863ff..f3816c0d3e783b6272c5abcc424641a4bb39d6dc 100644 (file)
@@ -525,15 +525,23 @@ class CollectionDirectory(CollectionDirectoryBase):
                         self.collection.update()
                         new_collection_record = self.collection.api_response()
                     else:
+                        # If there's too many prefetch threads and you
+                        # max out the CPU, delivering data to the FUSE
+                        # layer actually ends up being slower.
+                        # Experimentally, capping 7 threads seems to
+                        # be a sweet spot.
+                        get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
                         # Create a new collection object
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
                         else:
                             coll_reader = arvados.collection.CollectionReader(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
                         new_collection_record = coll_reader.api_response() or {}
                         # If the Collection only exists in Keep, there will be no API
                         # response.  Fill in the fields we need.
index ece316193d4ee6a82cf04f6a685f09b0af453cf3..1601db59440be8b57c35b988869a1a56229ef92b 100644 (file)
@@ -1088,6 +1088,7 @@ class FuseFsyncTest(FuseMagicTest):
 class MagicDirApiError(FuseMagicTest):
     def setUp(self):
         api = mock.MagicMock()
+        api.keep.block_cache = mock.MagicMock(cache_max=1)
         super(MagicDirApiError, self).setUp(api=api)
         api.collections().get().execute.side_effect = iter([
             Exception('API fail'),