X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ffb304afad93f50fd6ee43ecda6584dd0ac000c4..d49403ac749fffe24ac6357597e6014bb65efa30:/sdk/python/arvados/commands/get.py diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py index 888fd390f0..bb421def61 100755 --- a/sdk/python/arvados/commands/get.py +++ b/sdk/python/arvados/commands/get.py @@ -1,4 +1,7 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 import argparse import hashlib @@ -14,7 +17,6 @@ import arvados.util as util from arvados._version import __version__ -api_client = None logger = logging.getLogger('arvados.arv-get') parser = argparse.ArgumentParser( @@ -78,6 +80,10 @@ Overwrite existing files while writing. The default behavior is to refuse to write *anything* if any of the output files already exist. As a special case, -f is not needed to write to stdout. """) +group.add_argument('-v', action='count', default=0, + help=""" +Once for verbose mode, twice for debug mode. +""") group.add_argument('--skip-existing', action='store_true', help=""" Skip files that already exist. The default behavior is to refuse to @@ -91,6 +97,15 @@ When getting a collection manifest, strip its access tokens before writing it. """) +parser.add_argument('--threads', type=int, metavar='N', default=4, + help=""" +Set the number of download threads to be used. Take into account that +using lots of threads will increase the RAM requirements. Default is +to use 4 threads. +On high latency installations, using a greater number will improve +overall throughput. +""") + def parse_arguments(arguments, stdout, stderr): args = parser.parse_args(arguments) @@ -130,26 +145,23 @@ def parse_arguments(arguments, stdout, stderr): return args def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): - global api_client - if stdout is sys.stdout and hasattr(stdout, 'buffer'): # in Python 3, write to stdout as binary stdout = stdout.buffer args = parse_arguments(arguments, stdout, stderr) - if api_client is None: - api_client = arvados.api('v1') + logger.setLevel(logging.WARNING - 10 * args.v) + + request_id = arvados.util.new_request_id() + logger.info('X-Request-Id: '+request_id) + + api_client = arvados.api('v1', request_id=request_id) r = re.search(r'^(.*?)(/.*)?$', args.locator) col_loc = r.group(1) get_prefix = r.group(2) if args.r and not get_prefix: get_prefix = os.sep - try: - reader = arvados.CollectionReader(col_loc, num_retries=args.retries) - except Exception as error: - logger.error("failed to read collection: {}".format(error)) - return 1 # User asked to download the collection's manifest if not get_prefix: @@ -159,19 +171,38 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): open_flags |= os.O_EXCL try: if args.destination == "-": - stdout.write(reader.manifest_text(strip=args.strip_manifest).encode()) + write_block_or_manifest( + dest=stdout, src=col_loc, + api_client=api_client, args=args) else: out_fd = os.open(args.destination, open_flags) with os.fdopen(out_fd, 'wb') as out_file: - out_file.write(reader.manifest_text(strip=args.strip_manifest).encode()) + write_block_or_manifest( + dest=out_file, src=col_loc, + api_client=api_client, args=args) except (IOError, OSError) as error: logger.error("can't write to '{}': {}".format(args.destination, error)) return 1 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error: logger.error("failed to download '{}': {}".format(col_loc, error)) return 1 + except arvados.errors.ArgumentError as error: + if 'Argument to CollectionReader' in str(error): + logger.error("error reading collection: {}".format(error)) + return 1 + else: + raise return 0 + try: + reader = arvados.CollectionReader( + col_loc, api_client=api_client, num_retries=args.retries, + keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)), + get_threads=args.threads) + except Exception as error: + logger.error("failed to read collection: {}".format(error)) + return 1 + # Scan the collection. Make an array of (stream, file, local # destination filename) tuples, and add up total size to extract. todo = [] @@ -255,7 +286,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): if todo_bytes==0 else 100.0*out_bytes/todo_bytes))) elif args.batch_progress: - stderr.write('%s %d read %d total\n' % + stderr.write('%s %d read %d total %d\n' % (sys.argv[0], os.getpid(), out_bytes, todo_bytes)) if digestor: @@ -284,3 +315,14 @@ def files_in_collection(c): elif isinstance(c[i], arvados.collection.Subcollection): for s, f in files_in_collection(c[i]): yield (s, f) + +def write_block_or_manifest(dest, src, api_client, args): + if '+A' in src: + # block locator + kc = arvados.keep.KeepClient(api_client=api_client) + dest.write(kc.get(src, num_retries=args.retries)) + else: + # collection UUID or portable data hash + reader = arvados.CollectionReader( + src, api_client=api_client, num_retries=args.retries) + dest.write(reader.manifest_text(strip=args.strip_manifest).encode())