17004: Fix lingering resource error
[arvados.git] / sdk / python / arvados / commands / get.py
index 881fdd6ad0f968eddf3ab810d90e62df70b63895..bb421def618cddd36ba7d2241e2b1e81b58581ac 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
@@ -17,7 +17,6 @@ import arvados.util as util
 
 from arvados._version import __version__
 
-api_client = None
 logger = logging.getLogger('arvados.arv-get')
 
 parser = argparse.ArgumentParser(
@@ -81,6 +80,10 @@ Overwrite existing files while writing. The default behavior is to
 refuse to write *anything* if any of the output files already
 exist. As a special case, -f is not needed to write to stdout.
 """)
+group.add_argument('-v', action='count', default=0,
+                    help="""
+Once for verbose mode, twice for debug mode.
+""")
 group.add_argument('--skip-existing', action='store_true',
                    help="""
 Skip files that already exist. The default behavior is to refuse to
@@ -94,6 +97,15 @@ When getting a collection manifest, strip its access tokens before writing
 it.
 """)
 
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
@@ -133,15 +145,17 @@ def parse_arguments(arguments, stdout, stderr):
     return args
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
-    global api_client
-
     if stdout is sys.stdout and hasattr(stdout, 'buffer'):
         # in Python 3, write to stdout as binary
         stdout = stdout.buffer
 
     args = parse_arguments(arguments, stdout, stderr)
-    if api_client is None:
-        api_client = arvados.api('v1')
+    logger.setLevel(logging.WARNING - 10 * args.v)
+
+    request_id = arvados.util.new_request_id()
+    logger.info('X-Request-Id: '+request_id)
+
+    api_client = arvados.api('v1', request_id=request_id)
 
     r = re.search(r'^(.*?)(/.*)?$', args.locator)
     col_loc = r.group(1)
@@ -157,14 +171,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                 open_flags |= os.O_EXCL
             try:
                 if args.destination == "-":
-                    write_block_or_manifest(dest=stdout, src=col_loc,
-                                            api_client=api_client, args=args)
+                    write_block_or_manifest(
+                        dest=stdout, src=col_loc,
+                        api_client=api_client, args=args)
                 else:
                     out_fd = os.open(args.destination, open_flags)
                     with os.fdopen(out_fd, 'wb') as out_file:
-                        write_block_or_manifest(dest=out_file,
-                                                src=col_loc, api_client=api_client,
-                                                args=args)
+                        write_block_or_manifest(
+                            dest=out_file, src=col_loc,
+                            api_client=api_client, args=args)
             except (IOError, OSError) as error:
                 logger.error("can't write to '{}': {}".format(args.destination, error))
                 return 1
@@ -180,7 +195,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         return 0
 
     try:
-        reader = arvados.CollectionReader(col_loc, num_retries=args.retries)
+        reader = arvados.CollectionReader(
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
@@ -268,7 +286,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                        if todo_bytes==0
                                        else 100.0*out_bytes/todo_bytes)))
                     elif args.batch_progress:
-                        stderr.write('%s %d read %d total\n' %
+                        stderr.write('%s %d read %d total %d\n' %
                                      (sys.argv[0], os.getpid(),
                                       out_bytes, todo_bytes))
             if digestor:
@@ -305,5 +323,6 @@ def write_block_or_manifest(dest, src, api_client, args):
         dest.write(kc.get(src, num_retries=args.retries))
     else:
         # collection UUID or portable data hash
-        reader = arvados.CollectionReader(src, num_retries=args.retries)
+        reader = arvados.CollectionReader(
+            src, api_client=api_client, num_retries=args.retries)
         dest.write(reader.manifest_text(strip=args.strip_manifest).encode())