#!/usr/bin/env python # TODO: # --md5sum - display md5 of each file as read from disk import argparse import arvados import arvados.collection import base64 import copy import datetime import errno import fcntl import hashlib import json import logging import os import pwd import re import signal import socket import sys import tempfile import threading import time from apiclient import errors as apiclient_errors import arvados.commands._util as arv_cmd CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] api_client = None upload_opts = argparse.ArgumentParser(add_help=False) upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', help=""" Local file or directory. Default: read from standard input. """) _group = upload_opts.add_mutually_exclusive_group() _group.add_argument('--max-manifest-depth', type=int, metavar='N', default=-1, help=""" Maximum depth of directory tree to represent in the manifest structure. A directory structure deeper than this will be represented as a single stream in the manifest. If N=0, the manifest will contain a single stream. Default: -1 (unlimited), i.e., exactly one manifest stream per filesystem directory that contains files. """) _group.add_argument('--normalize', action='store_true', help=""" Normalize the manifest by re-ordering files and streams after writing data. """) _group = upload_opts.add_mutually_exclusive_group() _group.add_argument('--as-stream', action='store_true', dest='stream', help=""" Synonym for --stream. """) _group.add_argument('--stream', action='store_true', help=""" Store the file content and display the resulting manifest on stdout. Do not write the manifest to Keep or save a Collection object in Arvados. """) _group.add_argument('--as-manifest', action='store_true', dest='manifest', help=""" Synonym for --manifest. """) _group.add_argument('--in-manifest', action='store_true', dest='manifest', help=""" Synonym for --manifest. """) _group.add_argument('--manifest', action='store_true', help=""" Store the file data and resulting manifest in Keep, save a Collection object in Arvados, and display the manifest locator (Collection uuid) on stdout. This is the default behavior. """) _group.add_argument('--as-raw', action='store_true', dest='raw', help=""" Synonym for --raw. """) _group.add_argument('--raw', action='store_true', help=""" Store the file content and display the data block locators on stdout, separated by commas, with a trailing newline. Do not store a manifest. """) upload_opts.add_argument('--update-collection', type=str, default=None, dest='update_collection', metavar="UUID", help=""" Update an existing collection identified by the given Arvados collection UUID. All new local files will be uploaded. """) upload_opts.add_argument('--use-filename', type=str, default=None, dest='filename', help=""" Synonym for --filename. """) upload_opts.add_argument('--filename', type=str, default=None, help=""" Use the given filename in the manifest, instead of the name of the local file. This is useful when "-" or "/dev/stdin" is given as an input file. It can be used only if there is exactly one path given and it is not a directory. Implies --manifest. """) upload_opts.add_argument('--portable-data-hash', action='store_true', help=""" Print the portable data hash instead of the Arvados UUID for the collection created by the upload. """) upload_opts.add_argument('--replication', type=int, metavar='N', default=None, help=""" Set the replication level for the new collection: how many different physical storage devices (e.g., disks) should have a copy of each data block. Default is to use the server-provided default (if any) or 2. """) run_opts = argparse.ArgumentParser(add_help=False) run_opts.add_argument('--project-uuid', metavar='UUID', help=""" Store the collection in the specified project, instead of your Home project. """) run_opts.add_argument('--name', help=""" Save the collection with the specified name. """) _group = run_opts.add_mutually_exclusive_group() _group.add_argument('--progress', action='store_true', help=""" Display human-readable progress on stderr (bytes and, if possible, percentage of total data size). This is the default behavior when stderr is a tty. """) _group.add_argument('--no-progress', action='store_true', help=""" Do not display human-readable progress on stderr, even if stderr is a tty. """) _group.add_argument('--batch-progress', action='store_true', help=""" Display machine-readable progress on stderr (bytes and, if known, total data size). """) _group = run_opts.add_mutually_exclusive_group() _group.add_argument('--resume', action='store_true', default=True, help=""" Continue interrupted uploads from cached state (default). """) _group.add_argument('--no-resume', action='store_false', dest='resume', help=""" Do not continue interrupted uploads from cached state. """) arg_parser = argparse.ArgumentParser( description='Copy data from the local filesystem to Keep.', parents=[upload_opts, run_opts, arv_cmd.retry_opt]) def parse_arguments(arguments): args = arg_parser.parse_args(arguments) if len(args.paths) == 0: args.paths = ['-'] args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths) if len(args.paths) != 1 or os.path.isdir(args.paths[0]): if args.filename: arg_parser.error(""" --filename argument cannot be used when storing a directory or multiple files. """) # Turn on --progress by default if stderr is a tty. if (not (args.batch_progress or args.no_progress) and os.isatty(sys.stderr.fileno())): args.progress = True if args.paths == ['-']: if args.update_collection: arg_parser.error(""" --update-collection cannot be used when reading from stdin. """) args.resume = False if not args.filename: args.filename = 'stdin' return args class CollectionUpdateError(Exception): pass class ResumeCacheInvalid(Exception): pass class ResumeCacheConflict(Exception): pass class ResumeCache(object): CACHE_DIR = '.cache/arvados/arv-put' def __init__(self, file_spec): self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @classmethod def make_path(cls, args): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in args.paths) md5.update('\0'.join(realpaths)) if any(os.path.isdir(path) for path in realpaths): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: md5.update(args.filename) return os.path.join( arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), md5.hexdigest()) def _lock_file(self, fileobj): try: fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: raise ResumeCacheConflict("{} locked".format(fileobj.name)) def load(self): self.cache_file.seek(0) return json.load(self.cache_file) def check_cache(self, api_client=None, num_retries=0): try: state = self.load() locator = None try: if "_finished_streams" in state and len(state["_finished_streams"]) > 0: locator = state["_finished_streams"][0][1][0] elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: locator = state["_current_stream_locators"][0] if locator is not None: kc = arvados.keep.KeepClient(api_client=api_client) kc.head(locator, num_retries=num_retries) except Exception as e: self.restart() except (ValueError): pass def save(self, data): try: new_cache_fd, new_cache_name = tempfile.mkstemp( dir=os.path.dirname(self.filename)) self._lock_file(new_cache_fd) new_cache = os.fdopen(new_cache_fd, 'r+') json.dump(data, new_cache) os.rename(new_cache_name, self.filename) except (IOError, OSError, ResumeCacheConflict) as error: try: os.unlink(new_cache_name) except NameError: # mkstemp failed. pass else: self.cache_file.close() self.cache_file = new_cache def close(self): self.cache_file.close() def destroy(self): try: os.unlink(self.filename) except OSError as error: if error.errno != errno.ENOENT: # That's what we wanted anyway. raise self.close() def restart(self): self.destroy() self.__init__(self.filename) class ArvPutUploadJob(object): CACHE_DIR = '.cache/arvados/arv-put' EMPTY_STATE = { 'manifest' : None, # Last saved manifest checkpoint 'files' : {} # Previous run file list: {path : {size, mtime}} } def __init__(self, paths, resume=True, reporter=None, bytes_expected=None, name=None, owner_uuid=None, ensure_unique_name=False, num_retries=None, replication_desired=None, filename=None, update_time=1.0, update_collection=None): self.paths = paths self.resume = resume self.update = False self.reporter = reporter self.bytes_expected = bytes_expected self.bytes_written = 0 self.bytes_skipped = 0 self.name = name self.owner_uuid = owner_uuid self.ensure_unique_name = ensure_unique_name self.num_retries = num_retries self.replication_desired = replication_desired self.filename = filename self._state_lock = threading.Lock() self._state = None # Previous run state (file list & manifest) self._current_files = [] # Current run file list self._cache_file = None self._collection = None self._collection_lock = threading.Lock() self._local_collection = None # Previous run collection manifest self._file_paths = [] # Files to be updated in remote collection self._stop_checkpointer = threading.Event() self._checkpointer = threading.Thread(target=self._update_task) self._update_task_time = update_time # How many seconds wait between update runs self.logger = logging.getLogger('arvados.arv_put') # Load cached data if any and if needed self._setup_state(update_collection) def start(self, save_collection): """ Start supporting thread & file uploading """ self._checkpointer.daemon = True self._checkpointer.start() try: for path in self.paths: # Test for stdin first, in case some file named '-' exist if path == '-': self._write_stdin(self.filename or 'stdin') elif os.path.isdir(path): if path == '.' or path == './' or os.path.dirname(path) == '': dirname = '' else: dirname = os.path.dirname(path) + '/' for root, dirs, files in os.walk(path): # Make os.walk()'s dir traversing order deterministic dirs.sort() files.sort() for f in files: self._write_file(os.path.join(root, f), os.path.join(root[len(dirname):], f)) else: self._write_file(path, self.filename or os.path.basename(path)) finally: # Stop the thread before doing anything else self._stop_checkpointer.set() self._checkpointer.join() # Commit all & one last _update() self.manifest_text() if save_collection: self.save_collection() self._update() self._cache_file.close() # Correct the final written bytes count self.bytes_written -= self.bytes_skipped def save_collection(self): if self.update: # Check if files should be updated on the remote collection. for fp in self._file_paths: remote_file = self._collection.find(fp) if not remote_file: # File don't exist on remote collection, copy it. self._collection.copy(fp, fp, self._local_collection) elif remote_file != self._local_collection.find(fp): # A different file exist on remote collection, overwrite it. self._collection.copy(fp, fp, self._local_collection, overwrite=True) else: # The file already exist on remote collection, skip it. pass self._collection.save(num_retries=self.num_retries) else: self._my_collection().save_new( name=self.name, owner_uuid=self.owner_uuid, ensure_unique_name=self.ensure_unique_name, num_retries=self.num_retries) def destroy_cache(self): if self.resume: try: os.unlink(self._cache_filename) except OSError as error: # That's what we wanted anyway. if error.errno != errno.ENOENT: raise self._cache_file.close() def _collection_size(self, collection): """ Recursively get the total size of the collection """ size = 0 for item in collection.values(): if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): size += self._collection_size(item) else: size += item.size() return size def _update_task(self): """ Periodically called support task. File uploading is asynchronous so we poll status from the collection. """ while not self._stop_checkpointer.wait(self._update_task_time): self._update() def _update(self): """ Update cached manifest text and report progress. """ with self._collection_lock: self.bytes_written = self._collection_size(self._local_collection) # Update cache, if resume enabled with self._state_lock: # Get the manifest text without comitting pending blocks self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True) self._save_state() # Call the reporter, if any self.report_progress() def report_progress(self): if self.reporter is not None: self.reporter(self.bytes_written, self.bytes_expected) def _write_stdin(self, filename): output = self._local_collection.open(filename, 'w') self._write(sys.stdin, output) output.close() def _write_file(self, source, filename): resume_offset = 0 should_upload = False new_file_in_cache = False # Record file path for updating the remote collection before exiting self._file_paths.append(filename) with self._state_lock: # If no previous cached data on this file, store it for an eventual # repeated run. if source not in self._state['files']: self._state['files'][source] = { 'mtime': os.path.getmtime(source), 'size' : os.path.getsize(source) } new_file_in_cache = True cached_file_data = self._state['files'][source] # Check if file was already uploaded (at least partially) file_in_local_collection = self._local_collection.find(filename) # If not resuming, upload the full file. if not self.resume: should_upload = True # New file detected from last run, upload it. elif new_file_in_cache: should_upload = True # Local file didn't change from last run. elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): if not file_in_local_collection: # File not uploaded yet, upload it completely should_upload = True elif cached_file_data['size'] == file_in_local_collection.size(): # File already there, skip it. self.bytes_skipped += cached_file_data['size'] elif cached_file_data['size'] > file_in_local_collection.size(): # File partially uploaded, resume! resume_offset = file_in_local_collection.size() should_upload = True else: # Inconsistent cache, re-upload the file should_upload = True self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) # Local file differs from cached data, re-upload it. else: should_upload = True if should_upload: with open(source, 'r') as source_fd: with self._state_lock: self._state['files'][source]['mtime'] = os.path.getmtime(source) self._state['files'][source]['size'] = os.path.getsize(source) if resume_offset > 0: # Start upload where we left off output = self._local_collection.open(filename, 'a') source_fd.seek(resume_offset) self.bytes_skipped += resume_offset else: # Start from scratch output = self._local_collection.open(filename, 'w') self._write(source_fd, output) output.close(flush=False) def _write(self, source_fd, output): first_read = True while True: data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) # Allow an empty file to be written if not data and not first_read: break if first_read: first_read = False output.write(data) def _my_collection(self): return self._local_collection def _setup_state(self, update_collection): """ Create a new cache file or load a previously existing one. """ # Load an already existing collection for update if update_collection and re.match(arvados.util.collection_uuid_pattern, update_collection): try: self._collection = arvados.collection.Collection(update_collection) except arvados.errors.ApiError as error: raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) else: self.update = True elif update_collection: # Collection locator provided, but unknown format raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) # Set up cache file name from input paths. md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in self.paths) md5.update('\0'.join(realpaths)) if self.filename: md5.update(self.filename) cache_filename = md5.hexdigest() self._cache_file = open(os.path.join( arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), cache_filename), 'a+') self._cache_filename = self._cache_file.name self._lock_file(self._cache_file) self._cache_file.seek(0) with self._state_lock: try: self._state = json.load(self._cache_file) if not set(['manifest', 'files']).issubset(set(self._state.keys())): # Cache at least partially incomplete, set up new cache self._state = copy.deepcopy(self.EMPTY_STATE) except ValueError: # Cache file empty, set up new cache self._state = copy.deepcopy(self.EMPTY_STATE) # Load the previous manifest so we can check if files were modified remotely. self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired) # Load how many bytes were uploaded on previous run with self._collection_lock: self.bytes_written = self._collection_size(self._my_collection()) def _lock_file(self, fileobj): try: fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: raise ResumeCacheConflict("{} locked".format(fileobj.name)) def _save_state(self): """ Atomically save current state into cache. """ try: with self._state_lock: state = self._state new_cache_fd, new_cache_name = tempfile.mkstemp( dir=os.path.dirname(self._cache_filename)) self._lock_file(new_cache_fd) new_cache = os.fdopen(new_cache_fd, 'r+') json.dump(state, new_cache) new_cache.flush() os.fsync(new_cache) os.rename(new_cache_name, self._cache_filename) except (IOError, OSError, ResumeCacheConflict) as error: self.logger.error("There was a problem while saving the cache file: {}".format(error)) try: os.unlink(new_cache_name) except NameError: # mkstemp failed. pass else: self._cache_file.close() self._cache_file = new_cache def collection_name(self): with self._collection_lock: name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None return name def manifest_locator(self): locator = self._my_collection().manifest_locator() return locator def portable_data_hash(self): datahash = self._my_collection().portable_data_hash() return datahash def manifest_text(self, stream_name=".", strip=False, normalize=False): manifest = self._my_collection().manifest_text(stream_name, strip, normalize) return manifest def _datablocks_on_item(self, item): """ Return a list of datablock locators, recursively navigating through subcollections """ if isinstance(item, arvados.arvfile.ArvadosFile): if item.size() == 0: # Empty file locator return ["d41d8cd98f00b204e9800998ecf8427e+0"] else: locators = [] for segment in item.segments(): loc = segment.locator locators.append(loc) return locators elif isinstance(item, arvados.collection.Collection): l = [self._datablocks_on_item(x) for x in item.values()] # Fast list flattener method taken from: # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python return [loc for sublist in l for loc in sublist] else: return None def data_locators(self): with self._collection_lock: # Make sure all datablocks are flushed before getting the locators self._my_collection().manifest_text() datablocks = self._datablocks_on_item(self._my_collection()) return datablocks def expected_bytes_for(pathlist): # Walk the given directory trees and stat files, adding up file sizes, # so we can display progress as percent bytesum = 0 for path in pathlist: if os.path.isdir(path): for filename in arvados.util.listdir_recursive(path): bytesum += os.path.getsize(os.path.join(path, filename)) elif not os.path.isfile(path): return None else: bytesum += os.path.getsize(path) return bytesum _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], os.getpid()) def machine_progress(bytes_written, bytes_expected): return _machine_format.format( bytes_written, -1 if (bytes_expected is None) else bytes_expected) def human_progress(bytes_written, bytes_expected): if bytes_expected: return "\r{}M / {}M {:.1%} ".format( bytes_written >> 20, bytes_expected >> 20, float(bytes_written) / bytes_expected) else: return "\r{} ".format(bytes_written) def progress_writer(progress_func, outfile=sys.stderr): def write_progress(bytes_written, bytes_expected): outfile.write(progress_func(bytes_written, bytes_expected)) return write_progress def exit_signal_handler(sigcode, frame): sys.exit(-sigcode) def desired_project_uuid(api_client, project_uuid, num_retries): if not project_uuid: query = api_client.users().current() elif arvados.util.user_uuid_pattern.match(project_uuid): query = api_client.users().get(uuid=project_uuid) elif arvados.util.group_uuid_pattern.match(project_uuid): query = api_client.groups().get(uuid=project_uuid) else: raise ValueError("Not a valid project UUID: {}".format(project_uuid)) return query.execute(num_retries=num_retries)['uuid'] def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): global api_client args = parse_arguments(arguments) status = 0 if api_client is None: api_client = arvados.api('v1') # Determine the name to use if args.name: if args.stream or args.raw: print >>stderr, "Cannot use --name with --stream or --raw" sys.exit(1) collection_name = args.name else: collection_name = "Saved at {} by {}@{}".format( datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), pwd.getpwuid(os.getuid()).pw_name, socket.gethostname()) if args.project_uuid and (args.stream or args.raw): print >>stderr, "Cannot use --project-uuid with --stream or --raw" sys.exit(1) # Determine the parent project try: project_uuid = desired_project_uuid(api_client, args.project_uuid, args.retries) except (apiclient_errors.Error, ValueError) as error: print >>stderr, error sys.exit(1) if args.progress: reporter = progress_writer(human_progress) elif args.batch_progress: reporter = progress_writer(machine_progress) else: reporter = None bytes_expected = expected_bytes_for(args.paths) try: writer = ArvPutUploadJob(paths = args.paths, resume = args.resume, filename = args.filename, reporter = reporter, bytes_expected = bytes_expected, num_retries = args.retries, replication_desired = args.replication, name = collection_name, owner_uuid = project_uuid, ensure_unique_name = True, update_collection = args.update_collection) except ResumeCacheConflict: print >>stderr, "\n".join([ "arv-put: Another process is already uploading this data.", " Use --no-resume if this is really what you want."]) sys.exit(1) except ResumeCacheInvalid as error: print >>stderr, "\n".join([ "arv-put: %s" % str(error), " Use --no-resume or delete/move the cache file to upload to a new collection.", " Use --update-collection otherwise."]) sys.exit(1) except CollectionUpdateError as error: print >>stderr, "\n".join([ "arv-put: %s" % str(error)]) sys.exit(1) # Install our signal handler for each code in CAUGHT_SIGNALS, and save # the originals. orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) for sigcode in CAUGHT_SIGNALS} if not args.update_collection and args.resume and writer.bytes_written > 0: print >>stderr, "\n".join([ "arv-put: Resuming previous upload from last checkpoint.", " Use the --no-resume option to start over."]) writer.report_progress() output = None try: writer.start(save_collection=not(args.stream or args.raw)) except arvados.errors.ApiError as error: print >>stderr, "\n".join([ "arv-put: %s" % str(error)]) sys.exit(1) if args.progress: # Print newline to split stderr from stdout for humans. print >>stderr if args.stream: if args.normalize: output = writer.manifest_text(normalize=True) else: output = writer.manifest_text() elif args.raw: output = ','.join(writer.data_locators()) else: try: if args.update_collection: print >>stderr, "Collection updated: '{}'".format(writer.collection_name()) else: print >>stderr, "Collection saved as '{}'".format(writer.collection_name()) if args.portable_data_hash: output = writer.portable_data_hash() else: output = writer.manifest_locator() except apiclient_errors.Error as error: print >>stderr, ( "arv-put: Error creating Collection on project: {}.".format( error)) status = 1 # Print the locator (uuid) of the new collection. if output is None: status = status or 1 else: stdout.write(output) if not output.endswith('\n'): stdout.write('\n') for sigcode, orig_handler in orig_signal_handlers.items(): signal.signal(sigcode, orig_handler) if status != 0: sys.exit(status) # Success! return output if __name__ == '__main__': main()