Merge branch '20877-trashed-priority' refs #20877
[arvados.git] / sdk / python / arvados / commands / put.py
index 02af01a7e4eb52fb310e52003119d4d799ca14c1..0e732eafde87223a3b3c3522f4f02e089324d711 100644 (file)
@@ -444,7 +444,7 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None, api_client=None,
+                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
@@ -454,6 +454,7 @@ class ArvPutUploadJob(object):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
+        self.batch_mode = batch_mode
         self.update = False
         self.reporter = reporter
         # This will set to 0 before start counting, if no special files are going
@@ -575,6 +576,9 @@ class ArvPutUploadJob(object):
                     files.sort()
                     for f in files:
                         filepath = os.path.join(root, f)
+                        if not os.path.isfile(filepath):
+                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
+                            continue
                         # Add its size to the total bytes count (if applicable)
                         if self.follow_links or (not os.path.islink(filepath)):
                             if self.bytes_expected is not None:
@@ -914,12 +918,17 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             if not self._cached_manifest_valid():
-                raise ResumeCacheInvalidError()
+                if not self.batch_mode:
+                    raise ResumeCacheInvalidError()
+                else:
+                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+                    self.use_cache = False # Don't overwrite preexisting cache file.
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
-                storage_classes_desired=(self.storage_classes or ['default']),
+                storage_classes_desired=self.storage_classes,
                 put_threads=self.put_threads,
                 api_client=self._api_client,
                 num_retries=self.num_retries)
@@ -1127,7 +1136,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
 
     if api_client is None:
-        api_client = arvados.api('v1', request_id=request_id)
+        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
 
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
@@ -1256,6 +1265,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
+                                 batch_mode= args.batch,
                                  filename = args.filename,
                                  reporter = reporter,
                                  api_client = api_client,
@@ -1284,7 +1294,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "         or been created with another Arvados user's credentials.",
             "         Switch user or use one of the following options to restart upload:",
             "         --no-resume to start a new resume cache.",
-            "         --no-cache to disable resume cache."]))
+            "         --no-cache to disable resume cache.",
+            "         --batch to ignore the resume cache if invalid."]))
         sys.exit(1)
     except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([