X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c1e7255a85dfc2807ba78e1cf9d109d896c80b42..7962de491af28d00a9c88412ad4d1e42be83432a:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 255021e775..fa648dac45 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -3,132 +3,150 @@ # TODO: # --md5sum - display md5 of each file as read from disk +import apiclient.errors import argparse import arvados import base64 +import datetime import errno import fcntl import hashlib import json import os +import pwd import signal +import socket import sys import tempfile -CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] - -def parse_arguments(arguments): - parser = argparse.ArgumentParser( - description='Copy data from the local filesystem to Keep.') - - parser.add_argument('paths', metavar='path', type=str, nargs='*', - help=""" - Local file or directory. Default: read from standard input. - """) - - parser.add_argument('--max-manifest-depth', type=int, metavar='N', - default=-1, help=""" - Maximum depth of directory tree to represent in the manifest - structure. A directory structure deeper than this will be represented - as a single stream in the manifest. If N=0, the manifest will contain - a single stream. Default: -1 (unlimited), i.e., exactly one manifest - stream per filesystem directory that contains files. - """) - - group = parser.add_mutually_exclusive_group() - - group.add_argument('--as-stream', action='store_true', dest='stream', - help=""" - Synonym for --stream. - """) - - group.add_argument('--stream', action='store_true', - help=""" - Store the file content and display the resulting manifest on - stdout. Do not write the manifest to Keep or save a Collection object - in Arvados. - """) - - group.add_argument('--as-manifest', action='store_true', dest='manifest', - help=""" - Synonym for --manifest. - """) - - group.add_argument('--in-manifest', action='store_true', dest='manifest', - help=""" - Synonym for --manifest. - """) - - group.add_argument('--manifest', action='store_true', - help=""" - Store the file data and resulting manifest in Keep, save a Collection - object in Arvados, and display the manifest locator (Collection uuid) - on stdout. This is the default behavior. - """) - - group.add_argument('--as-raw', action='store_true', dest='raw', - help=""" - Synonym for --raw. - """) - - group.add_argument('--raw', action='store_true', - help=""" - Store the file content and display the data block locators on stdout, - separated by commas, with a trailing newline. Do not store a - manifest. - """) - - parser.add_argument('--use-filename', type=str, default=None, - dest='filename', help=""" - Synonym for --filename. - """) - - parser.add_argument('--filename', type=str, default=None, - help=""" - Use the given filename in the manifest, instead of the name of the - local file. This is useful when "-" or "/dev/stdin" is given as an - input file. It can be used only if there is exactly one path given and - it is not a directory. Implies --manifest. - """) - - group = parser.add_mutually_exclusive_group() - group.add_argument('--progress', action='store_true', - help=""" - Display human-readable progress on stderr (bytes and, if possible, - percentage of total data size). This is the default behavior when - stderr is a tty. - """) +import arvados.commands._util as arv_cmd - group.add_argument('--no-progress', action='store_true', - help=""" - Do not display human-readable progress on stderr, even if stderr is a - tty. - """) +CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] - group.add_argument('--batch-progress', action='store_true', - help=""" - Display machine-readable progress on stderr (bytes and, if known, - total data size). - """) +upload_opts = argparse.ArgumentParser(add_help=False) + +upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', + help=""" +Local file or directory. Default: read from standard input. +""") + +upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N', + default=-1, help=""" +Maximum depth of directory tree to represent in the manifest +structure. A directory structure deeper than this will be represented +as a single stream in the manifest. If N=0, the manifest will contain +a single stream. Default: -1 (unlimited), i.e., exactly one manifest +stream per filesystem directory that contains files. +""") + +upload_opts.add_argument('--project-uuid', metavar='UUID', help=""" +When a Collection is made, make a Link to save it under the specified project. +""") + +upload_opts.add_argument('--name', help=""" +When a Collection is linked to a project, use the specified name. +""") + +_group = upload_opts.add_mutually_exclusive_group() + +_group.add_argument('--as-stream', action='store_true', dest='stream', + help=""" +Synonym for --stream. +""") + +_group.add_argument('--stream', action='store_true', + help=""" +Store the file content and display the resulting manifest on +stdout. Do not write the manifest to Keep or save a Collection object +in Arvados. +""") + +_group.add_argument('--as-manifest', action='store_true', dest='manifest', + help=""" +Synonym for --manifest. +""") + +_group.add_argument('--in-manifest', action='store_true', dest='manifest', + help=""" +Synonym for --manifest. +""") + +_group.add_argument('--manifest', action='store_true', + help=""" +Store the file data and resulting manifest in Keep, save a Collection +object in Arvados, and display the manifest locator (Collection uuid) +on stdout. This is the default behavior. +""") + +_group.add_argument('--as-raw', action='store_true', dest='raw', + help=""" +Synonym for --raw. +""") + +_group.add_argument('--raw', action='store_true', + help=""" +Store the file content and display the data block locators on stdout, +separated by commas, with a trailing newline. Do not store a +manifest. +""") + +upload_opts.add_argument('--use-filename', type=str, default=None, + dest='filename', help=""" +Synonym for --filename. +""") + +upload_opts.add_argument('--filename', type=str, default=None, + help=""" +Use the given filename in the manifest, instead of the name of the +local file. This is useful when "-" or "/dev/stdin" is given as an +input file. It can be used only if there is exactly one path given and +it is not a directory. Implies --manifest. +""") + +run_opts = argparse.ArgumentParser(add_help=False) +_group = run_opts.add_mutually_exclusive_group() +_group.add_argument('--progress', action='store_true', + help=""" +Display human-readable progress on stderr (bytes and, if possible, +percentage of total data size). This is the default behavior when +stderr is a tty. +""") + +_group.add_argument('--no-progress', action='store_true', + help=""" +Do not display human-readable progress on stderr, even if stderr is a +tty. +""") + +_group.add_argument('--batch-progress', action='store_true', + help=""" +Display machine-readable progress on stderr (bytes and, if known, +total data size). +""") + +_group = run_opts.add_mutually_exclusive_group() +_group.add_argument('--resume', action='store_true', default=True, + help=""" +Continue interrupted uploads from cached state (default). +""") +_group.add_argument('--no-resume', action='store_false', dest='resume', + help=""" +Do not continue interrupted uploads from cached state. +""") + +arg_parser = argparse.ArgumentParser( + description='Copy data from the local filesystem to Keep.', + parents=[upload_opts, run_opts]) - group = parser.add_mutually_exclusive_group() - group.add_argument('--resume', action='store_true', default=True, - help=""" - Continue interrupted uploads from cached state (default). - """) - group.add_argument('--no-resume', action='store_false', dest='resume', - help=""" - Do not continue interrupted uploads from cached state. - """) - - args = parser.parse_args(arguments) +def parse_arguments(arguments): + args = arg_parser.parse_args(arguments) if len(args.paths) == 0: args.paths += ['/dev/stdin'] if len(args.paths) != 1 or os.path.isdir(args.paths[0]): if args.filename: - parser.error(""" + arg_parser.error(""" --filename argument cannot be used when storing a directory or multiple files. """) @@ -150,17 +168,7 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): - CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put') - - @classmethod - def setup_user_cache(cls): - try: - os.makedirs(cls.CACHE_DIR) - except OSError as error: - if error.errno != errno.EEXIST: - raise - else: - os.chmod(cls.CACHE_DIR, 0o700) + CACHE_DIR = '.cache/arvados/arv-put' def __init__(self, file_spec): self.cache_file = open(file_spec, 'a+') @@ -177,7 +185,9 @@ class ResumeCache(object): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: md5.update(args.filename) - return os.path.join(cls.CACHE_DIR, md5.hexdigest()) + return os.path.join( + arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), + md5.hexdigest()) def _lock_file(self, fileobj): try: @@ -328,9 +338,56 @@ def progress_writer(progress_func, outfile=sys.stderr): def exit_signal_handler(sigcode, frame): sys.exit(-sigcode) -def main(arguments=None): - ResumeCache.setup_user_cache() +def check_project_exists(project_uuid): + try: + arvados.api('v1').groups().get(uuid=project_uuid).execute() + except (apiclient.errors.Error, arvados.errors.NotFoundError) as error: + raise ValueError("Project {} not found ({})".format(project_uuid, + error)) + else: + return True + +def prep_project_link(args, stderr, project_exists=check_project_exists): + # Given the user's command line arguments, return a dictionary with data + # to create the desired project link for this Collection, or None. + # Raises ValueError if the arguments request something impossible. + making_collection = not (args.raw or args.stream) + any_link_spec = args.project_uuid or args.name + if not making_collection: + if any_link_spec: + raise ValueError("Requested a Link without creating a Collection") + return None + elif not any_link_spec: + stderr.write( + "arv-put: No --project-uuid or --name specified. This data will be cached\n" + "in Keep. You will need to find this upload by its locator(s) later.\n") + return None + elif not args.project_uuid: + raise ValueError("--name requires --project-uuid") + elif not project_exists(args.project_uuid): + raise ValueError("Project {} not found".format(args.project_uuid)) + link = {'tail_uuid': args.project_uuid, 'link_class': 'name'} + if args.name: + link['name'] = args.name + return link + +def create_project_link(locator, link): + link['head_uuid'] = locator + link.setdefault('name', "Collection saved by {}@{} at {}".format( + pwd.getpwuid(os.getuid()).pw_name, + socket.gethostname(), + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"))) + return arvados.api('v1').links().create(body=link).execute() + +def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): + status = 0 + args = parse_arguments(arguments) + try: + project_link = prep_project_link(args, stderr) + except ValueError as error: + print >>stderr, "arv-put: {}.".format(error) + sys.exit(2) if args.progress: reporter = progress_writer(human_progress) @@ -338,17 +395,25 @@ def main(arguments=None): reporter = progress_writer(machine_progress) else: reporter = None + bytes_expected = expected_bytes_for(args.paths) - try: - resume_cache = ResumeCache(ResumeCache.make_path(args)) - if not args.resume: - resume_cache.restart() - except ResumeCacheConflict: - print "arv-put: Another process is already uploading this data." - sys.exit(1) - - writer = ArvPutCollectionWriter.from_cache( - resume_cache, reporter, expected_bytes_for(args.paths)) + resume_cache = None + if args.resume: + try: + resume_cache = ResumeCache(ResumeCache.make_path(args)) + except (IOError, OSError, ValueError): + pass # Couldn't open cache directory/file. Continue without it. + except ResumeCacheConflict: + print >>stderr, "\n".join([ + "arv-put: Another process is already uploading this data.", + " Use --no-resume if this is really what you want."]) + sys.exit(1) + + if resume_cache is None: + writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected) + else: + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, bytes_expected) # Install our signal handler for each code in CAUGHT_SIGNALS, and save # the originals. @@ -356,11 +421,11 @@ def main(arguments=None): for sigcode in CAUGHT_SIGNALS} if writer.bytes_written > 0: # We're resuming a previous upload. - print >>sys.stderr, "\n".join([ + print >>stderr, "\n".join([ "arv-put: Resuming previous upload from last checkpoint.", " Use the --no-resume option to start over."]) - writer.report_progress() + writer.report_progress() writer.do_queued_work() # Do work resumed from cache. for path in args.paths: # Copy file data to Keep. if os.path.isdir(path): @@ -372,12 +437,12 @@ def main(arguments=None): writer.finish_current_stream() if args.progress: # Print newline to split stderr from stdout for humans. - print >>sys.stderr + print >>stderr if args.stream: - print writer.manifest_text(), + output = writer.manifest_text() elif args.raw: - print ','.join(writer.data_locators()) + output = ','.join(writer.data_locators()) else: # Register the resulting collection in Arvados. collection = arvados.api().collections().create( @@ -388,12 +453,30 @@ def main(arguments=None): ).execute() # Print the locator (uuid) of the new collection. - print collection['uuid'] + output = collection['uuid'] + if project_link is not None: + try: + create_project_link(output, project_link) + except apiclient.errors.Error as error: + print >>stderr, ( + "arv-put: Error adding Collection to project: {}.".format( + error)) + status = 1 + + stdout.write(output) + if not output.endswith('\n'): + stdout.write('\n') for sigcode, orig_handler in orig_signal_handlers.items(): signal.signal(sigcode, orig_handler) - resume_cache.destroy() + if status != 0: + sys.exit(status) + + if resume_cache is not None: + resume_cache.destroy() + + return output if __name__ == '__main__': main()