Merge branch '17936-arvput-batch-mode' into main. Closes #17936.
[arvados.git] / sdk / python / arvados / commands / put.py
index 813c6ef2978d4af6ce0a658496a7142314c48f5a..f6f85ba69619ba930cca9efd20d3b4f134f28527 100644 (file)
@@ -173,7 +173,8 @@ Follow file and directory symlinks (default).
 """)
 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
                     help="""
-Do not follow file and directory symlinks.
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
 """)
 
 
@@ -214,6 +215,12 @@ Do not print any debug messages to console. (Any error messages will
 still be displayed.)
 """)
 
+run_opts.add_argument('--batch', action='store_true', default=False,
+                      help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
@@ -437,7 +444,7 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None, api_client=None,
+                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
@@ -447,6 +454,7 @@ class ArvPutUploadJob(object):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
+        self.batch_mode = batch_mode
         self.update = False
         self.reporter = reporter
         # This will set to 0 before start counting, if no special files are going
@@ -525,6 +533,7 @@ class ArvPutUploadJob(object):
             elif not os.path.exists(path):
                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
             elif (not self.follow_links) and os.path.islink(path):
+                self.logger.warning("Skipping symlink '{}'".format(path))
                 continue
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
@@ -660,6 +669,9 @@ class ArvPutUploadJob(object):
             self._remote_collection.save(num_retries=self.num_retries,
                                          trash_at=self._collection_trash_at())
         else:
+            if len(self._local_collection) == 0:
+                self.logger.warning("No files were uploaded, skipping collection creation.")
+                return
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
@@ -903,12 +915,17 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             if not self._cached_manifest_valid():
-                raise ResumeCacheInvalidError()
+                if not self.batch_mode:
+                    raise ResumeCacheInvalidError()
+                else:
+                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+                    self.use_cache = False # Don't overwrite preexisting cache file.
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
-                storage_classes_desired=(self.storage_classes or ['default']),
+                storage_classes_desired=self.storage_classes,
                 put_threads=self.put_threads,
                 api_client=self._api_client,
                 num_retries=self.num_retries)
@@ -1245,6 +1262,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
+                                 batch_mode= args.batch,
                                  filename = args.filename,
                                  reporter = reporter,
                                  api_client = api_client,
@@ -1273,7 +1291,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "         or been created with another Arvados user's credentials.",
             "         Switch user or use one of the following options to restart upload:",
             "         --no-resume to start a new resume cache.",
-            "         --no-cache to disable resume cache."]))
+            "         --no-cache to disable resume cache.",
+            "         --batch to ignore the resume cache if invalid."]))
         sys.exit(1)
     except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
@@ -1296,7 +1315,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
-    except arvados.errors.ApiError as error:
+    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1311,7 +1330,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
-    else:
+    elif writer.manifest_locator() is not None:
         try:
             expiration_notice = ""
             if writer.collection_trash_at() is not None:
@@ -1337,6 +1356,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
+    else:
+        status = 1
 
     # Print the locator (uuid) of the new collection.
     if output is None: