#!/usr/bin/env python3 # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 import argparse import hashlib import os import re import string import sys import logging import arvados import arvados.commands._util as arv_cmd import arvados.util as util from arvados._version import __version__ api_client = None logger = logging.getLogger('arvados.arv-get') parser = argparse.ArgumentParser( description='Copy data from Keep to a local file or pipe.', parents=[arv_cmd.retry_opt]) parser.add_argument('--version', action='version', version="%s %s" % (sys.argv[0], __version__), help='Print version and exit.') parser.add_argument('locator', type=str, help=""" Collection locator, optionally with a file path or prefix. """) parser.add_argument('destination', type=str, nargs='?', default='-', help=""" Local file or directory where the data is to be written. Default: stdout. """) group = parser.add_mutually_exclusive_group() group.add_argument('--progress', action='store_true', help=""" Display human-readable progress on stderr (bytes and, if possible, percentage of total data size). This is the default behavior when it is not expected to interfere with the output: specifically, stderr is a tty _and_ either stdout is not a tty, or output is being written to named files rather than stdout. """) group.add_argument('--no-progress', action='store_true', help=""" Do not display human-readable progress on stderr. """) group.add_argument('--batch-progress', action='store_true', help=""" Display machine-readable progress on stderr (bytes and, if known, total data size). """) group = parser.add_mutually_exclusive_group() group.add_argument('--hash', help=""" Display the hash of each file as it is read from Keep, using the given hash algorithm. Supported algorithms include md5, sha1, sha224, sha256, sha384, and sha512. """) group.add_argument('--md5sum', action='store_const', dest='hash', const='md5', help=""" Display the MD5 hash of each file as it is read from Keep. """) parser.add_argument('-n', action='store_true', help=""" Do not write any data -- just read from Keep, and report md5sums if requested. """) parser.add_argument('-r', action='store_true', help=""" Retrieve all files in the specified collection/prefix. This is the default behavior if the "locator" argument ends with a forward slash. """) group = parser.add_mutually_exclusive_group() group.add_argument('-f', action='store_true', help=""" Overwrite existing files while writing. The default behavior is to refuse to write *anything* if any of the output files already exist. As a special case, -f is not needed to write to stdout. """) group.add_argument('-v', action='count', default=0, help=""" Once for verbose mode, twice for debug mode. """) group.add_argument('--skip-existing', action='store_true', help=""" Skip files that already exist. The default behavior is to refuse to write *anything* if any files exist that would have to be overwritten. This option causes even devices, sockets, and fifos to be skipped. """) group.add_argument('--strip-manifest', action='store_true', default=False, help=""" When getting a collection manifest, strip its access tokens before writing it. """) parser.add_argument('--threads', type=int, metavar='N', default=4, help=""" Set the number of download threads to be used. Take into account that using lots of threads will increase the RAM requirements. Default is to use 4 threads. On high latency installations, using a greater number will improve overall throughput. """) def parse_arguments(arguments, stdout, stderr): args = parser.parse_args(arguments) if args.locator[-1] == os.sep: args.r = True if (args.r and not args.n and not (args.destination and os.path.isdir(args.destination))): parser.error('Destination is not a directory.') if not args.r and (os.path.isdir(args.destination) or args.destination[-1] == os.path.sep): args.destination = os.path.join(args.destination, os.path.basename(args.locator)) logger.debug("Appended source file name to destination directory: %s", args.destination) if args.destination == '/dev/stdout': args.destination = "-" if args.destination == '-': # Normally you have to use -f to write to a file (or device) that # already exists, but "-" and "/dev/stdout" are common enough to # merit a special exception. args.f = True else: args.destination = args.destination.rstrip(os.sep) # Turn on --progress by default if stderr is a tty and output is # either going to a named file, or going (via stdout) to something # that isn't a tty. if (not (args.batch_progress or args.no_progress) and stderr.isatty() and (args.destination != '-' or not stdout.isatty())): args.progress = True return args def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): global api_client if stdout is sys.stdout and hasattr(stdout, 'buffer'): # in Python 3, write to stdout as binary stdout = stdout.buffer args = parse_arguments(arguments, stdout, stderr) logger.setLevel(logging.WARNING - 10 * args.v) request_id = arvados.util.new_request_id() logger.info('X-Request-Id: '+request_id) if api_client is None: api_client = arvados.api('v1', request_id=request_id) r = re.search(r'^(.*?)(/.*)?$', args.locator) col_loc = r.group(1) get_prefix = r.group(2) if args.r and not get_prefix: get_prefix = os.sep # User asked to download the collection's manifest if not get_prefix: if not args.n: open_flags = os.O_CREAT | os.O_WRONLY if not args.f: open_flags |= os.O_EXCL try: if args.destination == "-": write_block_or_manifest( dest=stdout, src=col_loc, api_client=api_client, args=args) else: out_fd = os.open(args.destination, open_flags) with os.fdopen(out_fd, 'wb') as out_file: write_block_or_manifest( dest=out_file, src=col_loc, api_client=api_client, args=args) except (IOError, OSError) as error: logger.error("can't write to '{}': {}".format(args.destination, error)) return 1 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error: logger.error("failed to download '{}': {}".format(col_loc, error)) return 1 except arvados.errors.ArgumentError as error: if 'Argument to CollectionReader' in str(error): logger.error("error reading collection: {}".format(error)) return 1 else: raise return 0 try: reader = arvados.CollectionReader( col_loc, api_client=api_client, num_retries=args.retries, keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)), get_threads=args.threads) except Exception as error: logger.error("failed to read collection: {}".format(error)) return 1 # Scan the collection. Make an array of (stream, file, local # destination filename) tuples, and add up total size to extract. todo = [] todo_bytes = 0 try: if get_prefix == os.sep: item = reader else: item = reader.find('.' + get_prefix) if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader): # If the user asked for a file and we got a subcollection, error out. if get_prefix[-1] != os.sep: logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix)) return 1 # If the user asked stdout as a destination, error out. elif args.destination == '-': logger.error("cannot use 'stdout' as destination when downloading multiple files.") return 1 # User asked for a subcollection, and that's what was found. Add up total size # to download. for s, f in files_in_collection(item): dest_path = os.path.join( args.destination, os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:]) if (not (args.n or args.f or args.skip_existing) and os.path.exists(dest_path)): logger.error('Local file %s already exists.' % (dest_path,)) return 1 todo += [(s, f, dest_path)] todo_bytes += f.size() elif isinstance(item, arvados.arvfile.ArvadosFile): todo += [(item.parent, item, args.destination)] todo_bytes += item.size() else: logger.error("'{}' not found.".format('.' + get_prefix)) return 1 except (IOError, arvados.errors.NotFoundError) as e: logger.error(e) return 1 out_bytes = 0 for s, f, outfilename in todo: outfile = None digestor = None if not args.n: if outfilename == "-": outfile = stdout else: if args.skip_existing and os.path.exists(outfilename): logger.debug('Local file %s exists. Skipping.', outfilename) continue elif not args.f and (os.path.isfile(outfilename) or os.path.isdir(outfilename)): # Good thing we looked again: apparently this file wasn't # here yet when we checked earlier. logger.error('Local file %s already exists.' % (outfilename,)) return 1 if args.r: arvados.util.mkdir_dash_p(os.path.dirname(outfilename)) try: outfile = open(outfilename, 'wb') except Exception as error: logger.error('Open(%s) failed: %s' % (outfilename, error)) return 1 if args.hash: digestor = hashlib.new(args.hash) try: with s.open(f.name, 'rb') as file_reader: for data in file_reader.readall(): if outfile: outfile.write(data) if digestor: digestor.update(data) out_bytes += len(data) if args.progress: stderr.write('\r%d MiB / %d MiB %.1f%%' % (out_bytes >> 20, todo_bytes >> 20, (100 if todo_bytes==0 else 100.0*out_bytes/todo_bytes))) elif args.batch_progress: stderr.write('%s %d read %d total\n' % (sys.argv[0], os.getpid(), out_bytes, todo_bytes)) if digestor: stderr.write("%s %s/%s\n" % (digestor.hexdigest(), s.stream_name(), f.name)) except KeyboardInterrupt: if outfile and (outfile.fileno() > 2) and not outfile.closed: os.unlink(outfile.name) break finally: if outfile != None and outfile != stdout: outfile.close() if args.progress: stderr.write('\n') return 0 def files_in_collection(c): # Sort first by file type, then alphabetically by file path. for i in sorted(list(c.keys()), key=lambda k: ( isinstance(c[k], arvados.collection.Subcollection), k.upper())): if isinstance(c[i], arvados.arvfile.ArvadosFile): yield (c, c[i]) elif isinstance(c[i], arvados.collection.Subcollection): for s, f in files_in_collection(c[i]): yield (s, f) def write_block_or_manifest(dest, src, api_client, args): if '+A' in src: # block locator kc = arvados.keep.KeepClient(api_client=api_client) dest.write(kc.get(src, num_retries=args.retries)) else: # collection UUID or portable data hash reader = arvados.CollectionReader( src, api_client=api_client, num_retries=args.retries) dest.write(reader.manifest_text(strip=args.strip_manifest).encode())