17004: Fix lingering resource error
[arvados.git] / sdk / python / arvados / commands / get.py
index d762bbd0ef7fc7f5e863e99b54a1fa2f6e2a1f29..bb421def618cddd36ba7d2241e2b1e81b58581ac 100755 (executable)
@@ -1,4 +1,7 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
 
 import argparse
 import hashlib
@@ -10,14 +13,11 @@ import logging
 
 import arvados
 import arvados.commands._util as arv_cmd
+import arvados.util as util
 
 from arvados._version import __version__
 
-api_client = None
-
-def abort(msg, code=1):
-    print >>sys.stderr, "arv-get:", msg
-    exit(code)
+logger = logging.getLogger('arvados.arv-get')
 
 parser = argparse.ArgumentParser(
     description='Copy data from Keep to a local file or pipe.',
@@ -80,6 +80,10 @@ Overwrite existing files while writing. The default behavior is to
 refuse to write *anything* if any of the output files already
 exist. As a special case, -f is not needed to write to stdout.
 """)
+group.add_argument('-v', action='count', default=0,
+                    help="""
+Once for verbose mode, twice for debug mode.
+""")
 group.add_argument('--skip-existing', action='store_true',
                    help="""
 Skip files that already exist. The default behavior is to refuse to
@@ -87,9 +91,23 @@ write *anything* if any files exist that would have to be
 overwritten. This option causes even devices, sockets, and fifos to be
 skipped.
 """)
+group.add_argument('--strip-manifest', action='store_true', default=False,
+                   help="""
+When getting a collection manifest, strip its access tokens before writing
+it.
+""")
 
-def parse_arguments(arguments, logger):
-    args = parser.parse_args()
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
+def parse_arguments(arguments, stdout, stderr):
+    args = parser.parse_args(arguments)
 
     if args.locator[-1] == os.sep:
         args.r = True
@@ -120,27 +138,32 @@ def parse_arguments(arguments, logger):
     # either going to a named file, or going (via stdout) to something
     # that isn't a tty.
     if (not (args.batch_progress or args.no_progress)
-        and sys.stderr.isatty()
+        and stderr.isatty()
         and (args.destination != '-'
-             or not sys.stdout.isatty())):
+             or not stdout.isatty())):
         args.progress = True
     return args
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
-    global api_client
-    
-    logger = logging.getLogger('arvados.arv-get')
-    args = parse_arguments(arguments, logger)
-    if api_client is None:
-        api_client = arvados.api('v1')
+    if stdout is sys.stdout and hasattr(stdout, 'buffer'):
+        # in Python 3, write to stdout as binary
+        stdout = stdout.buffer
+
+    args = parse_arguments(arguments, stdout, stderr)
+    logger.setLevel(logging.WARNING - 10 * args.v)
+
+    request_id = arvados.util.new_request_id()
+    logger.info('X-Request-Id: '+request_id)
+
+    api_client = arvados.api('v1', request_id=request_id)
 
     r = re.search(r'^(.*?)(/.*)?$', args.locator)
-    collection = r.group(1)
+    col_loc = r.group(1)
     get_prefix = r.group(2)
     if args.r and not get_prefix:
         get_prefix = os.sep
-    reader = arvados.CollectionReader(collection, num_retries=args.retries)
 
+    # User asked to download the collection's manifest
     if not get_prefix:
         if not args.n:
             open_flags = os.O_CREAT | os.O_WRONLY
@@ -148,44 +171,78 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                 open_flags |= os.O_EXCL
             try:
                 if args.destination == "-":
-                    sys.stdout.write(reader.manifest_text())
+                    write_block_or_manifest(
+                        dest=stdout, src=col_loc,
+                        api_client=api_client, args=args)
                 else:
                     out_fd = os.open(args.destination, open_flags)
                     with os.fdopen(out_fd, 'wb') as out_file:
-                        out_file.write(reader.manifest_text())
+                        write_block_or_manifest(
+                            dest=out_file, src=col_loc,
+                            api_client=api_client, args=args)
             except (IOError, OSError) as error:
-                abort("can't write to '{}': {}".format(args.destination, error))
+                logger.error("can't write to '{}': {}".format(args.destination, error))
+                return 1
             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
-                abort("failed to download '{}': {}".format(collection, error))
-        sys.exit(0)
+                logger.error("failed to download '{}': {}".format(col_loc, error))
+                return 1
+            except arvados.errors.ArgumentError as error:
+                if 'Argument to CollectionReader' in str(error):
+                    logger.error("error reading collection: {}".format(error))
+                    return 1
+                else:
+                    raise
+        return 0
+
+    try:
+        reader = arvados.CollectionReader(
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
+    except Exception as error:
+        logger.error("failed to read collection: {}".format(error))
+        return 1
 
     # Scan the collection. Make an array of (stream, file, local
     # destination filename) tuples, and add up total size to extract.
     todo = []
     todo_bytes = 0
     try:
-        for s, f in files_in_collection(reader):
-            if get_prefix and get_prefix[-1] == os.sep:
-                if 0 != string.find(os.path.join(s.stream_name(), f.name),
-                                    '.' + get_prefix):
-                    continue
-                if args.destination == "-":
-                    dest_path = "-"
-                else:
-                    dest_path = os.path.join(
-                        args.destination,
-                        os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
-                    if (not (args.n or args.f or args.skip_existing) and
-                        os.path.exists(dest_path)):
-                        abort('Local file %s already exists.' % (dest_path,))
-            else:
-                if os.path.join(s.stream_name(), f.name) != '.' + get_prefix:
-                    continue
-                dest_path = args.destination
-            todo += [(s, f, dest_path)]
-            todo_bytes += f.size()
-    except arvados.errors.NotFoundError as e:
-        abort(e)
+        if get_prefix == os.sep:
+            item = reader
+        else:
+            item = reader.find('.' + get_prefix)
+
+        if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
+            # If the user asked for a file and we got a subcollection, error out.
+            if get_prefix[-1] != os.sep:
+                logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
+                return 1
+            # If the user asked stdout as a destination, error out.
+            elif args.destination == '-':
+                logger.error("cannot use 'stdout' as destination when downloading multiple files.")
+                return 1
+            # User asked for a subcollection, and that's what was found. Add up total size
+            # to download.
+            for s, f in files_in_collection(item):
+                dest_path = os.path.join(
+                    args.destination,
+                    os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
+                if (not (args.n or args.f or args.skip_existing) and
+                    os.path.exists(dest_path)):
+                    logger.error('Local file %s already exists.' % (dest_path,))
+                    return 1
+                todo += [(s, f, dest_path)]
+                todo_bytes += f.size()
+        elif isinstance(item, arvados.arvfile.ArvadosFile):
+            todo += [(item.parent, item, args.destination)]
+            todo_bytes += item.size()
+        else:
+            logger.error("'{}' not found.".format('.' + get_prefix))
+            return 1
+    except (IOError, arvados.errors.NotFoundError) as e:
+        logger.error(e)
+        return 1
 
     out_bytes = 0
     for s, f, outfilename in todo:
@@ -193,7 +250,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         digestor = None
         if not args.n:
             if outfilename == "-":
-                outfile = sys.stdout
+                outfile = stdout
             else:
                 if args.skip_existing and os.path.exists(outfilename):
                     logger.debug('Local file %s exists. Skipping.', outfilename)
@@ -202,17 +259,19 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                    os.path.isdir(outfilename)):
                     # Good thing we looked again: apparently this file wasn't
                     # here yet when we checked earlier.
-                    abort('Local file %s already exists.' % (outfilename,))
+                    logger.error('Local file %s already exists.' % (outfilename,))
+                    return 1
                 if args.r:
                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
                 try:
                     outfile = open(outfilename, 'wb')
                 except Exception as error:
-                    abort('Open(%s) failed: %s' % (outfilename, error))
+                    logger.error('Open(%s) failed: %s' % (outfilename, error))
+                    return 1
         if args.hash:
             digestor = hashlib.new(args.hash)
         try:
-            with s.open(f.name, 'r') as file_reader:
+            with s.open(f.name, 'rb') as file_reader:
                 for data in file_reader.readall():
                     if outfile:
                         outfile.write(data)
@@ -220,30 +279,34 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                         digestor.update(data)
                     out_bytes += len(data)
                     if args.progress:
-                        sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
-                                         (out_bytes >> 20,
-                                          todo_bytes >> 20,
-                                          (100
-                                           if todo_bytes==0
-                                           else 100.0*out_bytes/todo_bytes)))
+                        stderr.write('\r%d MiB / %d MiB %.1f%%' %
+                                     (out_bytes >> 20,
+                                      todo_bytes >> 20,
+                                      (100
+                                       if todo_bytes==0
+                                       else 100.0*out_bytes/todo_bytes)))
                     elif args.batch_progress:
-                        sys.stderr.write('%s %d read %d total\n' %
-                                         (sys.argv[0], os.getpid(),
-                                          out_bytes, todo_bytes))
+                        stderr.write('%s %d read %d total %d\n' %
+                                     (sys.argv[0], os.getpid(),
+                                      out_bytes, todo_bytes))
             if digestor:
-                sys.stderr.write("%s  %s/%s\n"
-                                 % (digestor.hexdigest(), s.stream_name(), f.name))
+                stderr.write("%s  %s/%s\n"
+                             % (digestor.hexdigest(), s.stream_name(), f.name))
         except KeyboardInterrupt:
             if outfile and (outfile.fileno() > 2) and not outfile.closed:
                 os.unlink(outfile.name)
             break
+        finally:
+            if outfile != None and outfile != stdout:
+                outfile.close()
 
     if args.progress:
-        sys.stderr.write('\n')
+        stderr.write('\n')
+    return 0
 
 def files_in_collection(c):
     # Sort first by file type, then alphabetically by file path.
-    for i in sorted(c.keys(),
+    for i in sorted(list(c.keys()),
                     key=lambda k: (
                         isinstance(c[k], arvados.collection.Subcollection),
                         k.upper())):
@@ -252,3 +315,14 @@ def files_in_collection(c):
         elif isinstance(c[i], arvados.collection.Subcollection):
             for s, f in files_in_collection(c[i]):
                 yield (s, f)
+
+def write_block_or_manifest(dest, src, api_client, args):
+    if '+A' in src:
+        # block locator
+        kc = arvados.keep.KeepClient(api_client=api_client)
+        dest.write(kc.get(src, num_retries=args.retries))
+    else:
+        # collection UUID or portable data hash
+        reader = arvados.CollectionReader(
+            src, api_client=api_client, num_retries=args.retries)
+        dest.write(reader.manifest_text(strip=args.strip_manifest).encode())