X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/20b2d6f7560e82add928fa14d868c9a4319d4907..dcb4db28681b6949a56a1de579891cb375c423fe:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py old mode 100644 new mode 100755 index d8ed90bda0..6bb1a0b217 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -5,6 +5,7 @@ import argparse import arvados +import arvados.collection import base64 import datetime import errno @@ -13,6 +14,7 @@ import hashlib import json import os import pwd +import time import signal import socket import sys @@ -166,7 +168,9 @@ def parse_arguments(arguments): args = arg_parser.parse_args(arguments) if len(args.paths) == 0: - args.paths += ['/dev/stdin'] + args.paths = ['-'] + + args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths) if len(args.paths) != 1 or os.path.isdir(args.paths[0]): if args.filename: @@ -181,9 +185,9 @@ def parse_arguments(arguments): args.progress = True if args.paths == ['-']: - args.paths = ['/dev/stdin'] + args.resume = False if not args.filename: - args.filename = '-' + args.filename = 'stdin' return args @@ -223,6 +227,23 @@ class ResumeCache(object): self.cache_file.seek(0) return json.load(self.cache_file) + def check_cache(self, api_client=None, num_retries=0): + try: + state = self.load() + locator = None + try: + if "_finished_streams" in state and len(state["_finished_streams"]) > 0: + locator = state["_finished_streams"][0][1][0] + elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: + locator = state["_current_stream_locators"][0] + if locator is not None: + kc = arvados.keep.KeepClient(api_client=api_client) + kc.head(locator, num_retries=num_retries) + except Exception as e: + self.restart() + except (ValueError): + pass + def save(self, data): try: new_cache_fd, new_cache_name = tempfile.mkstemp( @@ -256,6 +277,311 @@ class ResumeCache(object): self.__init__(self.filename) +class ArvPutCollectionCache(object): + def __init__(self, paths): + md5 = hashlib.md5() + md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) + realpaths = sorted(os.path.realpath(path) for path in paths) + self.files = {} + for path in realpaths: + self._get_file_data(path) + # Only hash args paths + md5.update('\0'.join(realpaths)) + self.cache_hash = md5.hexdigest() + + self.cache_file = open(os.path.join( + arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'), + self.cache_hash), 'a+') + self._lock_file(self.cache_file) + self.filename = self.cache_file.name + self.data = self._load() + + def _load(self): + try: + self.cache_file.seek(0) + ret = json.load(self.cache_file) + except ValueError: + # File empty, set up new cache + ret = { + 'col_locator' : None, # Collection + 'uploaded' : {}, # Uploaded file list: {path : {size, mtime}} + } + return ret + + def _save(self): + """ + Atomically save + """ + # TODO: Should be a good idea to avoid _save() spamming? when writing + # lots of small files. + try: + new_cache_fd, new_cache_name = tempfile.mkstemp( + dir=os.path.dirname(self.filename)) + self._lock_file(new_cache_fd) + new_cache = os.fdopen(new_cache_fd, 'r+') + json.dump(self.data, new_cache) + os.rename(new_cache_name, self.filename) + except (IOError, OSError, ResumeCacheConflict) as error: + try: + os.unlink(new_cache_name) + except NameError: # mkstemp failed. + pass + else: + self.cache_file.close() + self.cache_file = new_cache + + def file_uploaded(self, path): + print "About to register an uploaded file: %s" % path + if path in self.files.keys(): + self.data['uploaded'][path] = self.files[path] + self._save() + print "Already registered the uploaded file!" + + def set_collection(self, loc): + self.data['col_locator'] = loc + self._save() + + def collection(self): + return self.data['col_locator'] + + def is_dirty(self, path): + if not path in self.data['uploaded'].keys(): + # Cannot be dirty is it wasn't even uploaded + return False + + if (self.files[path]['mtime'] != self.data['uploaded'][path]['mtime']) or (self.files[path]['size'] != self.data['uploaded'][path]['size']): + return True + else: + return False + + def dirty_files(self): + """ + Files that were previously uploaded but changed locally between + upload runs. These files should be re-uploaded. + """ + dirty = [] + for f in self.data['uploaded'].keys(): + if self.is_dirty(f): + dirty.append(f) + return dirty + + def uploaded_files(self): + """ + Files that were uploaded and have not changed locally between + upload runs. These files should be checked for partial uploads + """ + uploaded = [] + for f in self.data['uploaded'].keys(): + if not self.is_dirty(f): + uploaded.append(f) + return uploaded + + def pending_files(self): + """ + Files that should be uploaded, because of being dirty or that + never had the chance to be uploaded yet. + """ + pending = [] + uploaded = self.uploaded_files() + for f in self.files.keys(): + if f not in uploaded: + pending.append(f) + return pending + + def _get_file_data(self, path): + if os.path.isfile(path): + self.files[path] = {'mtime': os.path.getmtime(path), + 'size': os.path.getsize(path)} + elif os.path.isdir(path): + for item in os.listdir(path): + self._get_file_data(os.path.join(path, item)) + + def _lock_file(self, fileobj): + try: + fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + raise ResumeCacheConflict("{} locked".format(fileobj.name)) + + def close(self): + self.cache_file.close() + + def destroy(self): + try: + os.unlink(self.filename) + except OSError as error: + if error.errno != errno.ENOENT: # That's what we wanted anyway. + raise + self.close() + +class ArvPutUploader(object): + def __init__(self, paths, reporter=None): + expected_bytes = expected_bytes_for(paths) + self.cache = ArvPutCollectionCache(paths) + self.paths = paths + self.already_uploaded = False + # if self.cache.collection() is not None: + # self.collection = ArvPutCollection( + # locator=self.cache.collection(), + # cache=self.cache, + # reporter=reporter, + # bytes_expected=expected_bytes) + # else: + self.collection = ArvPutCollection( + cache=self.cache, + reporter=reporter, + bytes_expected=expected_bytes) + # self.cache.set_collection(self.collection.manifest_locator()) + + def do_upload(self): + if not self.already_uploaded: + for p in paths: + if os.path.isdir(p): + self.collection.write_directory_tree(p) + elif os.path.isfile(p): + self.collection.write_file(p, os.path.basename(p)) + self.cache.destroy() + self.already_uploaded = True + + def manifest(self): + return self.collection.manifest() + + def bytes_written(self): + return self.collection.bytes_written + + +class ArvPutCollection(object): + def __init__(self, cache=None, reporter=None, bytes_expected=None, + name=None, owner_uuid=None, ensure_unique_name=False, + num_retries=None, replication=None): + self.collection_flush_time = 60 # Secs + self.bytes_written = 0 + self.cache = cache + self.reporter = reporter + self.num_retries=num_retries + self.bytes_expected = bytes_expected + + locator = self.cache.collection() if self.cache else None + + if locator is None: + self.collection = arvados.collection.Collection() + self.collection.save_new(name=name, owner_uuid=owner_uuid, + ensure_unique_name=ensure_unique_name, + num_retries=num_retries) + if self.cache: + self.cache.set_collection(self.collection.manifest_locator()) + else: + self.collection = arvados.collection.Collection(locator) + + def save(self): + self.collection.save(num_retries=self.num_retries) + + def manifest_locator(self): + return self.collection.manifest_locator() + + def portable_data_hash(self): + return self.collectin.portable_data_hash() + + def manifest_text(self, stream_name=".", strip=False, normalize=False): + return self.collection.manifest_text(stream_name, strip, normalize) + + def _write(self, source_fd, output, first_block=True): + start_time = time.time() + while True: + data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) + if not data: + break + output.write(data) + output.flush() # Commit block to Keep + self.bytes_written += len(data) + # Is it time to update the collection? + if (time.time() - start_time) > self.collection_flush_time: + self.collection.save(num_retries=self.num_retries) + start_time = time.time() + # Once a block is written on each file, mark it as uploaded on the cache + if first_block: + if self.cache: + self.cache.file_uploaded(source_fd.name) + self.collection.save(num_retries=self.num_retries) + print "FLUSHED COLLECTION!!!" + first_block = False + self.report_progress() + + def write_stdin(self, filename): + with self.collection as c: + output = c.open(filename, 'w') + self._write(sys.stdin, output) + output.close() + self.collection.save() + + def write_file(self, source, filename): + if self.cache and source in self.cache.dirty_files(): + print "DIRTY: Removing file %s from collection to be uploaded again" % source + self.collection.remove(filename) + + resume_offset = 0 + resume_upload = False + try: + print "FIND file %s" % filename + collection_file = self.collection.find(filename) + except IOError: + # Not found + collection_file = None + + if collection_file: + print "File %s already in the collection, checking!" % source + if os.path.getsize(source) == collection_file.size(): + print "WARNING: file %s already uploaded, skipping!" % source + # File already there, skip it. + self.bytes_written += os.path.getsize(source) + return + elif os.path.getsize(source) > collection_file.size(): + print "WARNING: RESUMING file %s" % source + # File partially uploaded, resume! + resume_upload = True + resume_offset = collection_file.size() + self.bytes_written += resume_offset + else: + # Source file smaller than uploaded file, what happened here? + # TODO: Raise exception of some kind? + return + + with open(source, 'r') as source_fd: + with self.collection as c: + if resume_upload: + print "Resuming file, source: %s, filename: %s" % (source, filename) + output = c.open(filename, 'a') + source_fd.seek(resume_offset) + first_block = False + else: + print "Writing file, source: %s, filename: %s" % (source, filename) + output = c.open(filename, 'w') + first_block = True + + self._write(source_fd, output, first_block) + output.close() + self.collection.save() # One last save... + + def write_directory_tree(self, path, stream_name='.'): + if os.path.isdir(path): + for item in os.listdir(path): + print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name) + if os.path.isdir(os.path.join(path, item)): + self.write_directory_tree(os.path.join(path, item), + os.path.join(stream_name, item)) + else: + self.write_file(os.path.join(path, item), + os.path.join(stream_name, item)) + + def manifest(self): + print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE + print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text()) + return True + + def report_progress(self): + if self.reporter is not None: + self.reporter(self.bytes_written, self.bytes_expected) + + class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + ['bytes_written', '_seen_inputs']) @@ -279,7 +605,9 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): replication=replication) except (TypeError, ValueError, arvados.errors.StaleWriterStateError) as error: - return cls(cache, reporter, bytes_expected, num_retries=num_retries) + return cls(cache, reporter, bytes_expected, + num_retries=num_retries, + replication=replication) else: return writer @@ -376,6 +704,172 @@ def desired_project_uuid(api_client, project_uuid, num_retries): raise ValueError("Not a valid project UUID: {}".format(project_uuid)) return query.execute(num_retries=num_retries)['uuid'] +def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr): + global api_client + + args = parse_arguments(arguments) + status = 0 + if api_client is None: + api_client = arvados.api('v1') + + # Determine the name to use + if args.name: + if args.stream or args.raw: + print >>stderr, "Cannot use --name with --stream or --raw" + sys.exit(1) + collection_name = args.name + else: + collection_name = "Saved at {} by {}@{}".format( + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), + pwd.getpwuid(os.getuid()).pw_name, + socket.gethostname()) + + if args.project_uuid and (args.stream or args.raw): + print >>stderr, "Cannot use --project-uuid with --stream or --raw" + sys.exit(1) + + # Determine the parent project + try: + project_uuid = desired_project_uuid(api_client, args.project_uuid, + args.retries) + except (apiclient_errors.Error, ValueError) as error: + print >>stderr, error + sys.exit(1) + + # write_copies diverges from args.replication here. + # args.replication is how many copies we will instruct Arvados to + # maintain (by passing it in collections().create()) after all + # data is written -- and if None was given, we'll use None there. + # Meanwhile, write_copies is how many copies of each data block we + # write to Keep, which has to be a number. + # + # If we simply changed args.replication from None to a default + # here, we'd end up erroneously passing the default replication + # level (instead of None) to collections().create(). + write_copies = (args.replication or + api_client._rootDesc.get('defaultCollectionReplication', 2)) + + if args.progress: + reporter = progress_writer(human_progress) + elif args.batch_progress: + reporter = progress_writer(machine_progress) + else: + reporter = None + bytes_expected = expected_bytes_for(args.paths) + + resume_cache = None + if args.resume: + try: + resume_cache = ArvPutCollectionCache(args.paths) + except (IOError, OSError, ValueError): + pass # Couldn't open cache directory/file. Continue without it. + except ResumeCacheConflict: + print >>stderr, "\n".join([ + "arv-put: Another process is already uploading this data.", + " Use --no-resume if this is really what you want."]) + sys.exit(1) + + if args.stream or args.raw: + writer = ArvPutCollection(cache=resume_cache, + reporter=reporter, + bytes_expected=bytes_expected, + num_retries=args.retries, + replication=write_copies) + else: + writer = ArvPutCollection(cache=resume_cache, + reporter=reporter, + bytes_expected=bytes_expected, + num_retries=args.retries, + replication=write_copies, + name=collection_name, + owner_uuid=project_uuid, + ensure_unique_name=True) + + # Install our signal handler for each code in CAUGHT_SIGNALS, and save + # the originals. + orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) + for sigcode in CAUGHT_SIGNALS} + + if writer.bytes_written > 0: # We're resuming a previous upload. TODO + print >>stderr, "\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."]) + + writer.report_progress() + for path in args.paths: # Copy file data to Keep. + if path == '-': + writer.write_stdin(args.filename) + elif os.path.isdir(path): + writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params + else: + writer.write_file(path, args.filename or os.path.basename(path)) + + if args.progress: # Print newline to split stderr from stdout for humans. + print >>stderr + + output = None + if args.stream: + if args.normalize: + output = writer.manifest_text(normalize=True) + else: + output = writer.manifest_text() + elif args.raw: + output = ','.join(writer.data_locators()) # TODO + else: + try: + # manifest_text = writer.manifest_text() + # if args.normalize: + # manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True) + # replication_attr = 'replication_desired' + # if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None: + # # API called it 'redundancy' before #3410. + # replication_attr = 'redundancy' + # # Register the resulting collection in Arvados. + # collection = api_client.collections().create( + # body={ + # 'owner_uuid': project_uuid, + # 'name': collection_name, + # 'manifest_text': manifest_text, + # replication_attr: args.replication, + # }, + # ensure_unique_name=True + # ).execute(num_retries=args.retries) + # + # print >>stderr, "Collection saved as '%s'" % collection['name'] + # + writer.save() + if args.portable_data_hash: + output = writer.portable_data_hash() + else: + output = writer.manifest_locator() + with open('/tmp/lucas.txt', 'w') as f: + f.write(output) + + except apiclient_errors.Error as error: + print >>stderr, ( + "arv-put: Error creating Collection on project: {}.".format( + error)) + status = 1 + + # Print the locator (uuid) of the new collection. + if output is None: + status = status or 1 + else: + stdout.write(output) + if not output.endswith('\n'): + stdout.write('\n') + + for sigcode, orig_handler in orig_signal_handlers.items(): + signal.signal(sigcode, orig_handler) + + if status != 0: + sys.exit(status) + + if resume_cache is not None: + resume_cache.destroy() + + return output + def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): global api_client @@ -433,6 +927,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): if args.resume: try: resume_cache = ResumeCache(ResumeCache.make_path(args)) + resume_cache.check_cache(api_client=api_client, num_retries=args.retries) except (IOError, OSError, ValueError): pass # Couldn't open cache directory/file. Continue without it. except ResumeCacheConflict: @@ -465,7 +960,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): writer.report_progress() writer.do_queued_work() # Do work resumed from cache. for path in args.paths: # Copy file data to Keep. - if os.path.isdir(path): + if path == '-': + writer.start_new_stream() + writer.start_new_file(args.filename) + r = sys.stdin.read(64*1024) + while r: + # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get + # CollectionWriter.write(). + super(arvados.collection.ResumableCollectionWriter, writer).write(r) + r = sys.stdin.read(64*1024) + elif os.path.isdir(path): writer.write_directory_tree( path, max_manifest_depth=args.max_manifest_depth) else: @@ -476,17 +980,18 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): if args.progress: # Print newline to split stderr from stdout for humans. print >>stderr + output = None if args.stream: output = writer.manifest_text() if args.normalize: - output = CollectionReader(output).manifest_text(normalize=True) + output = arvados.collection.CollectionReader(output).manifest_text(normalize=True) elif args.raw: output = ','.join(writer.data_locators()) else: try: manifest_text = writer.manifest_text() if args.normalize: - manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True) + manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True) replication_attr = 'replication_desired' if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None: # API called it 'redundancy' before #3410. @@ -516,9 +1021,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): status = 1 # Print the locator (uuid) of the new collection. - stdout.write(output) - if not output.endswith('\n'): - stdout.write('\n') + if output is None: + status = status or 1 + else: + stdout.write(output) + if not output.endswith('\n'): + stdout.write('\n') for sigcode, orig_handler in orig_signal_handlers.items(): signal.signal(sigcode, orig_handler) @@ -532,4 +1040,4 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): return output if __name__ == '__main__': - main() + main_new()