14930: Adds expiration notice in local time at the end of the run.
[arvados.git] / sdk / python / arvados / commands / put.py
index b219def55381d31726d54bb0cf299b355cfc0032..4b04ad229e26f50042d5699659bd9ab2f6121623 100644 (file)
@@ -10,6 +10,7 @@ import argparse
 import arvados
 import arvados.collection
 import base64
 import arvados
 import arvados.collection
 import base64
+import ciso8601
 import copy
 import datetime
 import errno
 import copy
 import datetime
 import errno
@@ -160,7 +161,7 @@ Exclude files and directories whose names match the given glob pattern. When
 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 directory, relative to the provided input dirs will be excluded.
 When using a filename pattern like '*.txt', any text file will be excluded
 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 directory, relative to the provided input dirs will be excluded.
 When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
+no matter where it is placed.
 For the special case of needing to exclude only files or dirs directly below
 the given input directory, you can use a pattern like './exclude_this.gif'.
 You can specify multiple patterns by using this argument more than once.
 For the special case of needing to exclude only files or dirs directly below
 the given input directory, you can use a pattern like './exclude_this.gif'.
 You can specify multiple patterns by using this argument more than once.
@@ -234,6 +235,19 @@ _group.add_argument('--no-cache', action='store_false', dest='use_cache',
 Do not save upload state in a cache file for resuming.
 """)
 
 Do not save upload state in a cache file for resuming.
 """)
 
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+                    help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+                    help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -430,7 +444,8 @@ class ArvPutUploadJob(object):
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
-                 follow_links=True, exclude_paths=[], exclude_names=None):
+                 follow_links=True, exclude_paths=[], exclude_names=None,
+                 trash_at=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
@@ -470,6 +485,13 @@ class ArvPutUploadJob(object):
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
+        self._trash_at = trash_at
+
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -610,6 +632,17 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
             if self.use_cache:
                 self._cache_file.close()
 
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
+
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
@@ -625,7 +658,8 @@ class ArvPutUploadJob(object):
                     # The file already exist on remote collection, skip it.
                     pass
             self._remote_collection.save(storage_classes=self.storage_classes,
                     # The file already exist on remote collection, skip it.
                     pass
             self._remote_collection.save(storage_classes=self.storage_classes,
-                                         num_retries=self.num_retries)
+                                         num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
             if self.storage_classes is None:
                 self.storage_classes = ['default']
         else:
             if self.storage_classes is None:
                 self.storage_classes = ['default']
@@ -633,7 +667,8 @@ class ArvPutUploadJob(object):
                 name=self.name, owner_uuid=self.owner_uuid,
                 storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
                 name=self.name, owner_uuid=self.owner_uuid,
                 storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+                num_retries=self.num_retries,
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
 
     def destroy_cache(self):
         if self.use_cache:
@@ -688,6 +723,15 @@ class ArvPutUploadJob(object):
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -958,6 +1002,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -1073,6 +1120,41 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
+    # Trash arguments validation
+    trash_at = None
+    if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
+        try:
+            trash_at = ciso8601.parse_datetime(args.trash_at)
+        except:
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        else:
+            if trash_at.tzinfo is not None:
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+        if trash_at <= datetime.datetime.utcnow():
+            logger.error("--trash-at argument must be set in the future")
+            sys.exit(1)
+    if args.trash_after is not None:
+        if args.trash_after < 1:
+            logger.error("--trash-after argument must be >= 1")
+            sys.exit(1)
+        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
+
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
@@ -1178,7 +1260,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
-                                 exclude_names=exclude_names)
+                                 exclude_names=exclude_names,
+                                 trash_at=trash_at)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -1192,7 +1275,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "         --no-resume to start a new resume cache.",
             "         --no-cache to disable resume cache."]))
         sys.exit(1)
             "         --no-resume to start a new resume cache.",
             "         --no-cache to disable resume cache."]))
         sys.exit(1)
-    except CollectionUpdateError as error:
+    except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1202,10 +1285,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
@@ -1234,10 +1313,18 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         output = ','.join(writer.data_locators())
     else:
         try:
         output = ','.join(writer.data_locators())
     else:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
             if args.update_collection:
-                logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
             else:
-                logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else: