Merge branch '20877-trashed-priority' refs #20877
[arvados.git] / sdk / python / arvados / commands / put.py
index b219def55381d31726d54bb0cf299b355cfc0032..0e732eafde87223a3b3c3522f4f02e089324d711 100644 (file)
@@ -10,6 +10,7 @@ import argparse
 import arvados
 import arvados.collection
 import base64
 import arvados
 import arvados.collection
 import base64
+import ciso8601
 import copy
 import datetime
 import errno
 import copy
 import datetime
 import errno
@@ -76,8 +77,7 @@ Synonym for --stream.
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
@@ -160,7 +160,7 @@ Exclude files and directories whose names match the given glob pattern. When
 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 directory, relative to the provided input dirs will be excluded.
 When using a filename pattern like '*.txt', any text file will be excluded
 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 directory, relative to the provided input dirs will be excluded.
 When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
+no matter where it is placed.
 For the special case of needing to exclude only files or dirs directly below
 the given input directory, you can use a pattern like './exclude_this.gif'.
 You can specify multiple patterns by using this argument more than once.
 For the special case of needing to exclude only files or dirs directly below
 the given input directory, you can use a pattern like './exclude_this.gif'.
 You can specify multiple patterns by using this argument more than once.
@@ -173,7 +173,8 @@ Follow file and directory symlinks (default).
 """)
 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
                     help="""
 """)
 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
                     help="""
-Do not follow file and directory symlinks.
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
 """)
 
 
 """)
 
 
@@ -214,6 +215,12 @@ Do not print any debug messages to console. (Any error messages will
 still be displayed.)
 """)
 
 still be displayed.)
 """)
 
+run_opts.add_argument('--batch', action='store_true', default=False,
+                      help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
@@ -234,6 +241,19 @@ _group.add_argument('--no-cache', action='store_false', dest='use_cache',
 Do not save upload state in a cache file for resuming.
 """)
 
 Do not save upload state in a cache file for resuming.
 """)
 
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+                    help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+                    help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -246,9 +266,8 @@ def parse_arguments(arguments):
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
-    if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
-        if args.filename:
-            arg_parser.error("""
+    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+        arg_parser.error("""
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
@@ -425,15 +444,17 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None, api_client=None,
+                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
-                 follow_links=True, exclude_paths=[], exclude_names=None):
+                 follow_links=True, exclude_paths=[], exclude_names=None,
+                 trash_at=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
+        self.batch_mode = batch_mode
         self.update = False
         self.reporter = reporter
         # This will set to 0 before start counting, if no special files are going
         self.update = False
         self.reporter = reporter
         # This will set to 0 before start counting, if no special files are going
@@ -470,6 +491,13 @@ class ArvPutUploadJob(object):
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
+        self._trash_at = trash_at
+
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -504,6 +532,9 @@ class ArvPutUploadJob(object):
                 self._write_stdin(self.filename or 'stdin')
             elif not os.path.exists(path):
                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
                 self._write_stdin(self.filename or 'stdin')
             elif not os.path.exists(path):
                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+            elif (not self.follow_links) and os.path.islink(path):
+                self.logger.warning("Skipping symlink '{}'".format(path))
+                continue
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
                 # with the caching logic.
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
                 # with the caching logic.
@@ -545,6 +576,9 @@ class ArvPutUploadJob(object):
                     files.sort()
                     for f in files:
                         filepath = os.path.join(root, f)
                     files.sort()
                     for f in files:
                         filepath = os.path.join(root, f)
+                        if not os.path.isfile(filepath):
+                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
+                            continue
                         # Add its size to the total bytes count (if applicable)
                         if self.follow_links or (not os.path.islink(filepath)):
                             if self.bytes_expected is not None:
                         # Add its size to the total bytes count (if applicable)
                         if self.follow_links or (not os.path.islink(filepath)):
                             if self.bytes_expected is not None:
@@ -610,6 +644,17 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
             if self.use_cache:
                 self._cache_file.close()
 
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
+
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
@@ -624,16 +669,17 @@ class ArvPutUploadJob(object):
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._remote_collection.save(storage_classes=self.storage_classes,
-                                         num_retries=self.num_retries)
+            self._remote_collection.save(num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
         else:
-            if self.storage_classes is None:
-                self.storage_classes = ['default']
+            if len(self._local_collection) == 0:
+                self.logger.warning("No files were uploaded, skipping collection creation.")
+                return
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
-                storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
                 ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+                num_retries=self.num_retries,
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
 
     def destroy_cache(self):
         if self.use_cache:
@@ -688,6 +734,15 @@ class ArvPutUploadJob(object):
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -699,7 +754,7 @@ class ArvPutUploadJob(object):
 
     def _write_stdin(self, filename):
         output = self._local_collection.open(filename, 'wb')
 
     def _write_stdin(self, filename):
         output = self._local_collection.open(filename, 'wb')
-        self._write(sys.stdin, output)
+        self._write(sys.stdin.buffer, output)
         output.close()
 
     def _check_file(self, source, filename):
         output.close()
 
     def _check_file(self, source, filename):
@@ -823,7 +878,10 @@ class ArvPutUploadJob(object):
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
-                    update_collection, api_client=self._api_client)
+                    update_collection,
+                    api_client=self._api_client,
+                    storage_classes_desired=self.storage_classes,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -860,13 +918,20 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             if not self._cached_manifest_valid():
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             if not self._cached_manifest_valid():
-                raise ResumeCacheInvalidError()
+                if not self.batch_mode:
+                    raise ResumeCacheInvalidError()
+                else:
+                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+                    self.use_cache = False # Don't overwrite preexisting cache file.
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
+                storage_classes_desired=self.storage_classes,
                 put_threads=self.put_threads,
                 put_threads=self.put_threads,
-                api_client=self._api_client)
+                api_client=self._api_client,
+                num_retries=self.num_retries)
 
     def _cached_manifest_valid(self):
         """
 
     def _cached_manifest_valid(self):
         """
@@ -958,6 +1023,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -1068,11 +1136,49 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
 
     if api_client is None:
     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
 
     if api_client is None:
-        api_client = arvados.api('v1', request_id=request_id)
+        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
 
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
 
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
+    # Trash arguments validation
+    trash_at = None
+    if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
+        try:
+            trash_at = ciso8601.parse_datetime(args.trash_at)
+        except:
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        else:
+            if trash_at.tzinfo is not None:
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                if time.daylight:
+                    utcoffset = datetime.timedelta(seconds=time.altzone)
+                else:
+                    utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+        if trash_at <= datetime.datetime.utcnow():
+            logger.error("--trash-at argument must be set in the future")
+            sys.exit(1)
+    if args.trash_after is not None:
+        if args.trash_after < 1:
+            logger.error("--trash-after argument must be >= 1")
+            sys.exit(1)
+        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
+
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
@@ -1110,11 +1216,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     #  Split storage-classes argument
     storage_classes = None
     if args.storage_classes:
     #  Split storage-classes argument
     storage_classes = None
     if args.storage_classes:
-        storage_classes = args.storage_classes.strip().split(',')
-        if len(storage_classes) > 1:
-            logger.error("Multiple storage classes are not supported currently.")
-            sys.exit(1)
-
+        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
 
     # Setup exclude regex from all the --exclude arguments provided
     name_patterns = []
 
     # Setup exclude regex from all the --exclude arguments provided
     name_patterns = []
@@ -1163,6 +1265,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
+                                 batch_mode= args.batch,
                                  filename = args.filename,
                                  reporter = reporter,
                                  api_client = api_client,
                                  filename = args.filename,
                                  reporter = reporter,
                                  api_client = api_client,
@@ -1178,7 +1281,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
-                                 exclude_names=exclude_names)
+                                 exclude_names=exclude_names,
+                                 trash_at=trash_at)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -1190,9 +1294,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "         or been created with another Arvados user's credentials.",
             "         Switch user or use one of the following options to restart upload:",
             "         --no-resume to start a new resume cache.",
             "         or been created with another Arvados user's credentials.",
             "         Switch user or use one of the following options to restart upload:",
             "         --no-resume to start a new resume cache.",
-            "         --no-cache to disable resume cache."]))
+            "         --no-cache to disable resume cache.",
+            "         --batch to ignore the resume cache if invalid."]))
         sys.exit(1)
         sys.exit(1)
-    except CollectionUpdateError as error:
+    except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1202,10 +1307,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
@@ -1217,7 +1318,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
-    except arvados.errors.ApiError as error:
+    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1232,12 +1333,23 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
-    else:
+    elif writer.manifest_locator() is not None:
         try:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                if time.daylight:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+                else:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
             if args.update_collection:
-                logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
             else:
-                logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -1247,6 +1359,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
+    else:
+        status = 1
 
     # Print the locator (uuid) of the new collection.
     if output is None:
 
     # Print the locator (uuid) of the new collection.
     if output is None: