10467: Merge branch 'master' into 10467-client-disconnect
[arvados.git] / sdk / python / arvados / commands / put.py
index 0fc307a28f7cd5e8efcd42d37add89d3c4b29629..714281cc95b0475831f1761470c9cf1b5e91cce5 100644 (file)
@@ -374,6 +374,7 @@ class ArvPutUploadJob(object):
         self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
+        self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
         self.logger = logger
@@ -393,8 +394,8 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        if not self.dry_run:
+            self._checkpointer.start()
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
@@ -422,22 +423,29 @@ class ArvPutUploadJob(object):
             # there aren't any file to upload.
             if self.dry_run:
                 raise ArvPutUploadNotPending()
+            # Remove local_collection's files that don't exist locally anymore, so the
+            # bytes_written count is correct.
+            for f in self.collection_file_paths(self._local_collection,
+                                                path_prefix=""):
+                if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
             self._upload_files()
         finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all pending blocks & one last _update()
-            self._local_collection.manifest_text()
-            self._update(final=True)
+            if not self.dry_run:
+                # Stop the thread before doing anything else
+                self._stop_checkpointer.set()
+                self._checkpointer.join()
+                # Commit all pending blocks & one last _update()
+                self._local_collection.manifest_text()
+                self._update(final=True)
+                if save_collection:
+                    self.save_collection()
             if self.use_cache:
                 self._cache_file.close()
-            if save_collection:
-                self.save_collection()
 
     def save_collection(self):
         if self.update:
@@ -657,6 +665,17 @@ class ArvPutUploadJob(object):
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
 
+    def collection_file_paths(self, col, path_prefix='.'):
+        """Return a list of file paths by recursively go through the entire collection `col`"""
+        file_paths = []
+        for name, item in col.items():
+            if isinstance(item, arvados.arvfile.ArvadosFile):
+                file_paths.append(os.path.join(path_prefix, name))
+            elif isinstance(item, arvados.collection.Subcollection):
+                new_prefix = os.path.join(path_prefix, name)
+                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+        return file_paths
+
     def _lock_file(self, fileobj):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
@@ -792,6 +811,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if args.stream or args.raw:
             logger.error("Cannot use --name with --stream or --raw")
             sys.exit(1)
+        elif args.update_collection:
+            logger.error("Cannot use --name with --update-collection")
+            sys.exit(1)
         collection_name = args.name
     else:
         collection_name = "Saved at {} by {}@{}".format(
@@ -833,7 +855,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
-                                 logger=logger)
+                                 logger=logger,
+                                 dry_run=args.dry_run)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -855,12 +878,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if not args.update_collection and args.resume and writer.bytes_written > 0:
+    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
             "arv-put: Resuming previous upload from last checkpoint.",
             "         Use the --no-resume option to start over."]))
 
-    writer.report_progress()
+    if not args.dry_run:
+        writer.report_progress()
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
@@ -868,9 +892,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
-        logger.error("\n")
+        logger.info("\n")
 
     if args.stream:
         if args.normalize: