X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9f2fccd1c01823a762044c8a73e6fa0f7ed9086b..ebb2559b3a09636ff687316bbe512e0e8a86b168:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 0fc307a28f..714281cc95 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -374,6 +374,7 @@ class ArvPutUploadJob(object): self._file_paths = [] # Files to be updated in remote collection self._stop_checkpointer = threading.Event() self._checkpointer = threading.Thread(target=self._update_task) + self._checkpointer.daemon = True self._update_task_time = update_time # How many seconds wait between update runs self._files_to_upload = FileUploadList(dry_run=dry_run) self.logger = logger @@ -393,8 +394,8 @@ class ArvPutUploadJob(object): """ Start supporting thread & file uploading """ - self._checkpointer.daemon = True - self._checkpointer.start() + if not self.dry_run: + self._checkpointer.start() try: for path in self.paths: # Test for stdin first, in case some file named '-' exist @@ -422,22 +423,29 @@ class ArvPutUploadJob(object): # there aren't any file to upload. if self.dry_run: raise ArvPutUploadNotPending() + # Remove local_collection's files that don't exist locally anymore, so the + # bytes_written count is correct. + for f in self.collection_file_paths(self._local_collection, + path_prefix=""): + if f != 'stdin' and f != self.filename and not f in self._file_paths: + self._local_collection.remove(f) # Update bytes_written from current local collection and # report initial progress. self._update() # Actual file upload self._upload_files() finally: - # Stop the thread before doing anything else - self._stop_checkpointer.set() - self._checkpointer.join() - # Commit all pending blocks & one last _update() - self._local_collection.manifest_text() - self._update(final=True) + if not self.dry_run: + # Stop the thread before doing anything else + self._stop_checkpointer.set() + self._checkpointer.join() + # Commit all pending blocks & one last _update() + self._local_collection.manifest_text() + self._update(final=True) + if save_collection: + self.save_collection() if self.use_cache: self._cache_file.close() - if save_collection: - self.save_collection() def save_collection(self): if self.update: @@ -657,6 +665,17 @@ class ArvPutUploadJob(object): # Load the previous manifest so we can check if files were modified remotely. self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired) + def collection_file_paths(self, col, path_prefix='.'): + """Return a list of file paths by recursively go through the entire collection `col`""" + file_paths = [] + for name, item in col.items(): + if isinstance(item, arvados.arvfile.ArvadosFile): + file_paths.append(os.path.join(path_prefix, name)) + elif isinstance(item, arvados.collection.Subcollection): + new_prefix = os.path.join(path_prefix, name) + file_paths += self.collection_file_paths(item, path_prefix=new_prefix) + return file_paths + def _lock_file(self, fileobj): try: fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) @@ -792,6 +811,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): if args.stream or args.raw: logger.error("Cannot use --name with --stream or --raw") sys.exit(1) + elif args.update_collection: + logger.error("Cannot use --name with --update-collection") + sys.exit(1) collection_name = args.name else: collection_name = "Saved at {} by {}@{}".format( @@ -833,7 +855,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): owner_uuid = project_uuid, ensure_unique_name = True, update_collection = args.update_collection, - logger=logger) + logger=logger, + dry_run=args.dry_run) except ResumeCacheConflict: logger.error("\n".join([ "arv-put: Another process is already uploading this data.", @@ -855,12 +878,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) for sigcode in CAUGHT_SIGNALS} - if not args.update_collection and args.resume and writer.bytes_written > 0: + if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: logger.warning("\n".join([ "arv-put: Resuming previous upload from last checkpoint.", " Use the --no-resume option to start over."])) - writer.report_progress() + if not args.dry_run: + writer.report_progress() output = None try: writer.start(save_collection=not(args.stream or args.raw)) @@ -868,9 +892,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): logger.error("\n".join([ "arv-put: %s" % str(error)])) sys.exit(1) + except ArvPutUploadIsPending: + # Dry run check successful, return proper exit code. + sys.exit(2) + except ArvPutUploadNotPending: + # No files pending for upload + sys.exit(0) if args.progress: # Print newline to split stderr from stdout for humans. - logger.error("\n") + logger.info("\n") if args.stream: if args.normalize: