X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4f1085f353d44600643a8e9dd6b43a39131e7946..c005c5ab76492b844e84d1c66f75797bd98d0996:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index ef34e07199..8cf56b6a38 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -3,21 +3,26 @@ # TODO: # --md5sum - display md5 of each file as read from disk +import apiclient.errors import argparse import arvados import base64 +import datetime import errno import fcntl import hashlib import json import os +import pwd import signal +import socket import sys import tempfile import arvados.commands._util as arv_cmd CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] +api_client = None upload_opts = argparse.ArgumentParser(add_help=False) @@ -35,6 +40,16 @@ a single stream. Default: -1 (unlimited), i.e., exactly one manifest stream per filesystem directory that contains files. """) +upload_opts.add_argument('--project-uuid', metavar='UUID', help=""" +Store the collection in the specified project, instead of your Home +project. +""") + +upload_opts.add_argument('--name', help=""" +Save the collection with the specified name, rather than the default +generic name "Saved at {time} by {username}@{host}". +""") + _group = upload_opts.add_mutually_exclusive_group() _group.add_argument('--as-stream', action='store_true', dest='stream', @@ -158,10 +173,6 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): CACHE_DIR = '.cache/arvados/arv-put' - @classmethod - def setup_user_cache(cls): - return arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700) - def __init__(self, file_spec): self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) @@ -177,7 +188,9 @@ class ResumeCache(object): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: md5.update(args.filename) - return os.path.join(cls.CACHE_DIR, md5.hexdigest()) + return os.path.join( + arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), + md5.hexdigest()) def _lock_file(self, fileobj): try: @@ -226,13 +239,14 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + ['bytes_written', '_seen_inputs']) - def __init__(self, cache=None, reporter=None, bytes_expected=None): + def __init__(self, cache=None, reporter=None, bytes_expected=None, + api_client=None): self.bytes_written = 0 self._seen_inputs = [] self.cache = cache self.reporter = reporter self.bytes_expected = bytes_expected - super(ArvPutCollectionWriter, self).__init__() + super(ArvPutCollectionWriter, self).__init__(api_client) @classmethod def from_cache(cls, cache, reporter=None, bytes_expected=None): @@ -328,8 +342,57 @@ def progress_writer(progress_func, outfile=sys.stderr): def exit_signal_handler(sigcode, frame): sys.exit(-sigcode) -def main(arguments=None, output_to=sys.stdout): +def check_project_exists(project_uuid): + try: + api_client.groups().get(uuid=project_uuid).execute() + except (apiclient.errors.Error, arvados.errors.NotFoundError) as error: + raise ValueError("Project {} not found ({})".format(project_uuid, + error)) + else: + return True + +def prep_project_link(args, stderr, project_exists=check_project_exists): + # Given the user's command line arguments, return a dictionary with data + # to create the desired project link for this Collection, or None. + # Raises ValueError if the arguments request something impossible. + making_collection = not (args.raw or args.stream) + if not making_collection: + if args.name or args.project_uuid: + raise ValueError("Requested a Link without creating a Collection") + return None + link = {'tail_uuid': args.project_uuid, + 'link_class': 'name', + 'name': args.name} + if not link['tail_uuid']: + link['tail_uuid'] = api_client.users().current().execute()['uuid'] + elif not project_exists(link['tail_uuid']): + raise ValueError("Project {} not found".format(args.project_uuid)) + if not link['name']: + link['name'] = "Saved at {} by {}@{}".format( + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), + pwd.getpwuid(os.getuid()).pw_name, + socket.gethostname()) + stderr.write( + "arv-put: No --name specified. Saving as \"%s\"\n" % link['name']) + link['owner_uuid'] = link['tail_uuid'] + return link + +def create_project_link(locator, link): + link['head_uuid'] = locator + return api_client.links().create(body=link).execute() + +def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): + global api_client + if api_client is None: + api_client = arvados.api('v1') + status = 0 + args = parse_arguments(arguments) + try: + project_link = prep_project_link(args, stderr) + except ValueError as error: + print >>stderr, "arv-put: {}.".format(error) + sys.exit(2) if args.progress: reporter = progress_writer(human_progress) @@ -340,21 +403,20 @@ def main(arguments=None, output_to=sys.stdout): bytes_expected = expected_bytes_for(args.paths) resume_cache = None - try: - if ResumeCache.setup_user_cache() is not None: + if args.resume: + try: resume_cache = ResumeCache(ResumeCache.make_path(args)) - except (IOError, OSError): - pass # Couldn't open cache directory/file. Continue without it. - except ResumeCacheConflict: - output_to.write( - "arv-put: Another process is already uploading this data.\n") - sys.exit(1) + except (IOError, OSError, ValueError): + pass # Couldn't open cache directory/file. Continue without it. + except ResumeCacheConflict: + print >>stderr, "\n".join([ + "arv-put: Another process is already uploading this data.", + " Use --no-resume if this is really what you want."]) + sys.exit(1) if resume_cache is None: writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected) else: - if not args.resume: - resume_cache.restart() writer = ArvPutCollectionWriter.from_cache( resume_cache, reporter, bytes_expected) @@ -364,11 +426,11 @@ def main(arguments=None, output_to=sys.stdout): for sigcode in CAUGHT_SIGNALS} if writer.bytes_written > 0: # We're resuming a previous upload. - print >>sys.stderr, "\n".join([ + print >>stderr, "\n".join([ "arv-put: Resuming previous upload from last checkpoint.", " Use the --no-resume option to start over."]) - writer.report_progress() + writer.report_progress() writer.do_queued_work() # Do work resumed from cache. for path in args.paths: # Copy file data to Keep. if os.path.isdir(path): @@ -380,7 +442,7 @@ def main(arguments=None, output_to=sys.stdout): writer.finish_current_stream() if args.progress: # Print newline to split stderr from stdout for humans. - print >>sys.stderr + print >>stderr if args.stream: output = writer.manifest_text() @@ -388,23 +450,39 @@ def main(arguments=None, output_to=sys.stdout): output = ','.join(writer.data_locators()) else: # Register the resulting collection in Arvados. - collection = arvados.api().collections().create( + collection = api_client.collections().create( body={ - 'uuid': writer.finish(), 'manifest_text': writer.manifest_text(), + 'owner_uuid': project_link['tail_uuid'] }, ).execute() - # Print the locator (uuid) of the new collection. output = collection['uuid'] - - output_to.write(output) + if project_link is not None: + # Update collection name + try: + if 'name' in collection: + arvados.api().collections().update(uuid=output, + body={"name": project_link["name"]}).execute() + else: + create_project_link(output, project_link) + except apiclient.errors.Error as error: + print >>stderr, ( + "arv-put: Error adding Collection to project: {}.".format( + error)) + status = 1 + + # Print the locator (uuid) of the new collection. + stdout.write(output) if not output.endswith('\n'): - output_to.write('\n') + stdout.write('\n') for sigcode, orig_handler in orig_signal_handlers.items(): signal.signal(sigcode, orig_handler) + if status != 0: + sys.exit(status) + if resume_cache is not None: resume_cache.destroy()