X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/283154a1d4ebc745f03abeef96c0571d284d4a70..66c13b6055a363cb08197b8c5d040ed9a511c8ca:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 37974c0945..4a926c701c 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -6,116 +6,156 @@ import argparse import arvados import base64 +import datetime import errno import fcntl import hashlib import json import os +import pwd +import signal +import socket import sys import tempfile +from apiclient import errors as apiclient_errors + +import arvados.commands._util as arv_cmd + +CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] +api_client = None + +upload_opts = argparse.ArgumentParser(add_help=False) + +upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', + help=""" +Local file or directory. Default: read from standard input. +""") + +upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N', + default=-1, help=""" +Maximum depth of directory tree to represent in the manifest +structure. A directory structure deeper than this will be represented +as a single stream in the manifest. If N=0, the manifest will contain +a single stream. Default: -1 (unlimited), i.e., exactly one manifest +stream per filesystem directory that contains files. +""") + +_group = upload_opts.add_mutually_exclusive_group() + +_group.add_argument('--as-stream', action='store_true', dest='stream', + help=""" +Synonym for --stream. +""") + +_group.add_argument('--stream', action='store_true', + help=""" +Store the file content and display the resulting manifest on +stdout. Do not write the manifest to Keep or save a Collection object +in Arvados. +""") + +_group.add_argument('--as-manifest', action='store_true', dest='manifest', + help=""" +Synonym for --manifest. +""") + +_group.add_argument('--in-manifest', action='store_true', dest='manifest', + help=""" +Synonym for --manifest. +""") + +_group.add_argument('--manifest', action='store_true', + help=""" +Store the file data and resulting manifest in Keep, save a Collection +object in Arvados, and display the manifest locator (Collection uuid) +on stdout. This is the default behavior. +""") + +_group.add_argument('--as-raw', action='store_true', dest='raw', + help=""" +Synonym for --raw. +""") + +_group.add_argument('--raw', action='store_true', + help=""" +Store the file content and display the data block locators on stdout, +separated by commas, with a trailing newline. Do not store a +manifest. +""") + +upload_opts.add_argument('--use-filename', type=str, default=None, + dest='filename', help=""" +Synonym for --filename. +""") + +upload_opts.add_argument('--filename', type=str, default=None, + help=""" +Use the given filename in the manifest, instead of the name of the +local file. This is useful when "-" or "/dev/stdin" is given as an +input file. It can be used only if there is exactly one path given and +it is not a directory. Implies --manifest. +""") + +upload_opts.add_argument('--portable-data-hash', action='store_true', + help=""" +Print the portable data hash instead of the Arvados UUID for the collection +created by the upload. +""") + +run_opts = argparse.ArgumentParser(add_help=False) + +run_opts.add_argument('--project-uuid', metavar='UUID', help=""" +Store the collection in the specified project, instead of your Home +project. +""") + +run_opts.add_argument('--name', help=""" +Save the collection with the specified name. +""") + +_group = run_opts.add_mutually_exclusive_group() +_group.add_argument('--progress', action='store_true', + help=""" +Display human-readable progress on stderr (bytes and, if possible, +percentage of total data size). This is the default behavior when +stderr is a tty. +""") + +_group.add_argument('--no-progress', action='store_true', + help=""" +Do not display human-readable progress on stderr, even if stderr is a +tty. +""") + +_group.add_argument('--batch-progress', action='store_true', + help=""" +Display machine-readable progress on stderr (bytes and, if known, +total data size). +""") + +_group = run_opts.add_mutually_exclusive_group() +_group.add_argument('--resume', action='store_true', default=True, + help=""" +Continue interrupted uploads from cached state (default). +""") +_group.add_argument('--no-resume', action='store_false', dest='resume', + help=""" +Do not continue interrupted uploads from cached state. +""") + +arg_parser = argparse.ArgumentParser( + description='Copy data from the local filesystem to Keep.', + parents=[upload_opts, run_opts, arv_cmd.retry_opt]) def parse_arguments(arguments): - parser = argparse.ArgumentParser( - description='Copy data from the local filesystem to Keep.') - - parser.add_argument('paths', metavar='path', type=str, nargs='*', - help=""" - Local file or directory. Default: read from standard input. - """) - - parser.add_argument('--max-manifest-depth', type=int, metavar='N', - default=-1, help=""" - Maximum depth of directory tree to represent in the manifest - structure. A directory structure deeper than this will be represented - as a single stream in the manifest. If N=0, the manifest will contain - a single stream. Default: -1 (unlimited), i.e., exactly one manifest - stream per filesystem directory that contains files. - """) - - group = parser.add_mutually_exclusive_group() - - group.add_argument('--as-stream', action='store_true', dest='stream', - help=""" - Synonym for --stream. - """) - - group.add_argument('--stream', action='store_true', - help=""" - Store the file content and display the resulting manifest on - stdout. Do not write the manifest to Keep or save a Collection object - in Arvados. - """) - - group.add_argument('--as-manifest', action='store_true', dest='manifest', - help=""" - Synonym for --manifest. - """) - - group.add_argument('--in-manifest', action='store_true', dest='manifest', - help=""" - Synonym for --manifest. - """) - - group.add_argument('--manifest', action='store_true', - help=""" - Store the file data and resulting manifest in Keep, save a Collection - object in Arvados, and display the manifest locator (Collection uuid) - on stdout. This is the default behavior. - """) - - group.add_argument('--as-raw', action='store_true', dest='raw', - help=""" - Synonym for --raw. - """) - - group.add_argument('--raw', action='store_true', - help=""" - Store the file content and display the data block locators on stdout, - separated by commas, with a trailing newline. Do not store a - manifest. - """) - - parser.add_argument('--use-filename', type=str, default=None, - dest='filename', help=""" - Synonym for --filename. - """) - - parser.add_argument('--filename', type=str, default=None, - help=""" - Use the given filename in the manifest, instead of the name of the - local file. This is useful when "-" or "/dev/stdin" is given as an - input file. It can be used only if there is exactly one path given and - it is not a directory. Implies --manifest. - """) - - group = parser.add_mutually_exclusive_group() - group.add_argument('--progress', action='store_true', - help=""" - Display human-readable progress on stderr (bytes and, if possible, - percentage of total data size). This is the default behavior when - stderr is a tty. - """) - - group.add_argument('--no-progress', action='store_true', - help=""" - Do not display human-readable progress on stderr, even if stderr is a - tty. - """) - - group.add_argument('--batch-progress', action='store_true', - help=""" - Display machine-readable progress on stderr (bytes and, if known, - total data size). - """) - - args = parser.parse_args(arguments) + args = arg_parser.parse_args(arguments) if len(args.paths) == 0: args.paths += ['/dev/stdin'] if len(args.paths) != 1 or os.path.isdir(args.paths[0]): if args.filename: - parser.error(""" + arg_parser.error(""" --filename argument cannot be used when storing a directory or multiple files. """) @@ -137,14 +177,10 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): - CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put') + CACHE_DIR = '.cache/arvados/arv-put' def __init__(self, file_spec): - try: - self.cache_file = open(file_spec, 'a+') - except TypeError: - file_spec = self.make_path(file_spec) - self.cache_file = open(file_spec, 'a+') + self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @@ -153,12 +189,14 @@ class ResumeCache(object): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in args.paths) - md5.update(''.join(realpaths)) + md5.update('\0'.join(realpaths)) if any(os.path.isdir(path) for path in realpaths): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: md5.update(args.filename) - return os.path.join(cls.CACHE_DIR, md5.hexdigest()) + return os.path.join( + arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), + md5.hexdigest()) def _lock_file(self, fileobj): try: @@ -198,32 +236,40 @@ class ResumeCache(object): raise self.close() + def restart(self): + self.destroy() + self.__init__(self.filename) + class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): - def __init__(self, cache=None, reporter=None, bytes_expected=None): - self.__init_locals__(cache, reporter, bytes_expected) - super(ArvPutCollectionWriter, self).__init__() + STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + + ['bytes_written', '_seen_inputs']) - def __init_locals__(self, cache, reporter, bytes_expected): - self.cache = cache - self.report_func = reporter + def __init__(self, cache=None, reporter=None, bytes_expected=None, + api_client=None, num_retries=0): self.bytes_written = 0 + self._seen_inputs = [] + self.cache = cache + self.reporter = reporter self.bytes_expected = bytes_expected + super(ArvPutCollectionWriter, self).__init__( + api_client, num_retries=num_retries) @classmethod - def from_cache(cls, cache, reporter=None, bytes_expected=None): + def from_cache(cls, cache, reporter=None, bytes_expected=None, + num_retries=0): try: state = cache.load() state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])] - writer = cls.from_state(state) + writer = cls.from_state(state, cache, reporter, bytes_expected, + num_retries=num_retries) except (TypeError, ValueError, arvados.errors.StaleWriterStateError) as error: - return cls(cache, reporter, bytes_expected) + return cls(cache, reporter, bytes_expected, num_retries=num_retries) else: - writer.__init_locals__(cache, reporter, bytes_expected) return writer - def checkpoint_state(self): + def cache_state(self): if self.cache is None: return state = self.dump_state() @@ -235,12 +281,38 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): state[attr] = list(value) self.cache.save(state) + def report_progress(self): + if self.reporter is not None: + self.reporter(self.bytes_written, self.bytes_expected) + def flush_data(self): - bytes_buffered = self._data_buffer_len + start_buffer_len = self._data_buffer_len + start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE super(ArvPutCollectionWriter, self).flush_data() - self.bytes_written += (bytes_buffered - self._data_buffer_len) - if self.report_func is not None: - self.report_func(self.bytes_written, self.bytes_expected) + if self._data_buffer_len < start_buffer_len: # We actually PUT data. + self.bytes_written += (start_buffer_len - self._data_buffer_len) + self.report_progress() + if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count: + self.cache_state() + + def _record_new_input(self, input_type, source_name, dest_name): + # The key needs to be a list because that's what we'll get back + # from JSON deserialization. + key = [input_type, source_name, dest_name] + if key in self._seen_inputs: + return False + self._seen_inputs.append(key) + return True + + def write_file(self, source, filename=None): + if self._record_new_input('file', source, filename): + super(ArvPutCollectionWriter, self).write_file(source, filename) + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + if self._record_new_input('directory', path, stream_name): + super(ArvPutCollectionWriter, self).write_directory_tree( + path, stream_name, max_manifest_depth) def expected_bytes_for(pathlist): @@ -265,9 +337,9 @@ def machine_progress(bytes_written, bytes_expected): def human_progress(bytes_written, bytes_expected): if bytes_expected: - return "\r{}M / {}M {:.1f}% ".format( + return "\r{}M / {}M {:.1%} ".format( bytes_written >> 20, bytes_expected >> 20, - bytes_written / bytes_expected) + float(bytes_written) / bytes_expected) else: return "\r{} ".format(bytes_written) @@ -276,8 +348,51 @@ def progress_writer(progress_func, outfile=sys.stderr): outfile.write(progress_func(bytes_written, bytes_expected)) return write_progress -def main(arguments=None): +def exit_signal_handler(sigcode, frame): + sys.exit(-sigcode) + +def desired_project_uuid(api_client, project_uuid, num_retries): + if not project_uuid: + query = api_client.users().current() + elif arvados.util.user_uuid_pattern.match(project_uuid): + query = api_client.users().get(uuid=project_uuid) + elif arvados.util.group_uuid_pattern.match(project_uuid): + query = api_client.groups().get(uuid=project_uuid) + else: + raise ValueError("Not a valid project UUID: {}".format(project_uuid)) + return query.execute(num_retries=num_retries)['uuid'] + +def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): + global api_client + args = parse_arguments(arguments) + status = 0 + if api_client is None: + api_client = arvados.api('v1') + + # Determine the name to use + if args.name: + if args.stream or args.raw: + print >>stderr, "Cannot use --name with --stream or --raw" + sys.exit(1) + collection_name = args.name + else: + collection_name = "Saved at {} by {}@{}".format( + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), + pwd.getpwuid(os.getuid()).pw_name, + socket.gethostname()) + + if args.project_uuid and (args.stream or args.raw): + print >>stderr, "Cannot use --project-uuid with --stream or --raw" + sys.exit(1) + + # Determine the parent project + try: + project_uuid = desired_project_uuid(api_client, args.project_uuid, + args.retries) + except (apiclient_errors.Error, ValueError) as error: + print >>stderr, error + sys.exit(1) if args.progress: reporter = progress_writer(human_progress) @@ -285,35 +400,95 @@ def main(arguments=None): reporter = progress_writer(machine_progress) else: reporter = None + bytes_expected = expected_bytes_for(args.paths) - writer = ArvPutCollectionWriter( - reporter=reporter, bytes_expected=expected_bytes_for(args.paths)) - - # Copy file data to Keep. - for path in args.paths: + resume_cache = None + if args.resume: + try: + resume_cache = ResumeCache(ResumeCache.make_path(args)) + except (IOError, OSError, ValueError): + pass # Couldn't open cache directory/file. Continue without it. + except ResumeCacheConflict: + print >>stderr, "\n".join([ + "arv-put: Another process is already uploading this data.", + " Use --no-resume if this is really what you want."]) + sys.exit(1) + + if resume_cache is None: + writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected, + num_retries=args.retries) + else: + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, bytes_expected, num_retries=args.retries) + + # Install our signal handler for each code in CAUGHT_SIGNALS, and save + # the originals. + orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) + for sigcode in CAUGHT_SIGNALS} + + if writer.bytes_written > 0: # We're resuming a previous upload. + print >>stderr, "\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."]) + + writer.report_progress() + writer.do_queued_work() # Do work resumed from cache. + for path in args.paths: # Copy file data to Keep. if os.path.isdir(path): writer.write_directory_tree( path, max_manifest_depth=args.max_manifest_depth) else: writer.start_new_stream() writer.write_file(path, args.filename or os.path.basename(path)) + writer.finish_current_stream() + + if args.progress: # Print newline to split stderr from stdout for humans. + print >>stderr if args.stream: - print writer.manifest_text(), + output = writer.manifest_text() elif args.raw: - writer.finish_current_stream() - print ','.join(writer.data_locators()) + output = ','.join(writer.data_locators()) else: - # Register the resulting collection in Arvados. - arvados.api().collections().create( - body={ - 'uuid': writer.finish(), - 'manifest_text': writer.manifest_text(), - }, - ).execute() - - # Print the locator (uuid) of the new collection. - print writer.finish() + try: + # Register the resulting collection in Arvados. + collection = api_client.collections().create( + body={ + 'owner_uuid': project_uuid, + 'name': collection_name, + 'manifest_text': writer.manifest_text() + }, + ensure_unique_name=True + ).execute(num_retries=args.retries) + + print >>stderr, "Collection saved as '%s'" % collection['name'] + + if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']: + output = collection['portable_data_hash'] + else: + output = collection['uuid'] + + except apiclient_errors.Error as error: + print >>stderr, ( + "arv-put: Error creating Collection on project: {}.".format( + error)) + status = 1 + + # Print the locator (uuid) of the new collection. + stdout.write(output) + if not output.endswith('\n'): + stdout.write('\n') + + for sigcode, orig_handler in orig_signal_handlers.items(): + signal.signal(sigcode, orig_handler) + + if status != 0: + sys.exit(status) + + if resume_cache is not None: + resume_cache.destroy() + + return output if __name__ == '__main__': main()