Merge branch '17833-arvput-cleaner-errors'
[arvados.git] / sdk / python / arvados / commands / put.py
index aa8467cb95e435a330189e92eb7dbc178277b0ff..ad04807712dd00d7e89e478be1a7eb2c01d0b057 100644 (file)
@@ -77,8 +77,7 @@ Synonym for --stream.
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
@@ -174,7 +173,8 @@ Follow file and directory symlinks (default).
 """)
 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
                     help="""
-Do not follow file and directory symlinks.
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
 """)
 
 
@@ -236,10 +236,11 @@ Do not save upload state in a cache file for resuming.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
-_group.add_argument('--trash-at', metavar='YYYY-MM-DD HH:MM', default=None,
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
                     help="""
 Set the trash date of the resulting collection to an absolute date in the future.
-The accepted format is defined by the ISO 8601 standard.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
 """)
 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
                     help="""
@@ -259,9 +260,8 @@ def parse_arguments(arguments):
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
-    if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
-        if args.filename:
-            arg_parser.error("""
+    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+        arg_parser.error("""
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
@@ -484,10 +484,13 @@ class ArvPutUploadJob(object):
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
-        self.trash_at = trash_at
+        self._trash_at = trash_at
 
-        if self.trash_at is not None and type(self.trash_at) not in [datetime.datetime, datetime.timedelta]:
-            raise TypeError('trash_at should be None, datetime or timedelta')
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -522,6 +525,9 @@ class ArvPutUploadJob(object):
                 self._write_stdin(self.filename or 'stdin')
             elif not os.path.exists(path):
                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+            elif (not self.follow_links) and os.path.islink(path):
+                self.logger.warning("Skipping symlink '{}'".format(path))
+                continue
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
                 # with the caching logic.
@@ -628,11 +634,18 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
-    def save_collection(self):
-        if type(self.trash_at) == datetime.timedelta:
-            # Get an absolute datetime for trash_at before saving.
-            self.trash_at = datetime.datetime.utcnow() + self.trash_at
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
 
+    def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
             for fp in self._file_paths:
@@ -646,18 +659,17 @@ class ArvPutUploadJob(object):
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._remote_collection.save(storage_classes=self.storage_classes,
-                                         num_retries=self.num_retries,
-                                         trash_at=self.trash_at)
+            self._remote_collection.save(num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
-            if self.storage_classes is None:
-                self.storage_classes = ['default']
+            if len(self._local_collection) == 0:
+                self.logger.warning("No files were uploaded, skipping collection creation.")
+                return
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
-                storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries,
-                trash_at=self.trash_at)
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
@@ -712,6 +724,15 @@ class ArvPutUploadJob(object):
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -723,7 +744,7 @@ class ArvPutUploadJob(object):
 
     def _write_stdin(self, filename):
         output = self._local_collection.open(filename, 'wb')
-        self._write(sys.stdin, output)
+        self._write(sys.stdin.buffer, output)
         output.close()
 
     def _check_file(self, source, filename):
@@ -847,7 +868,10 @@ class ArvPutUploadJob(object):
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
-                    update_collection, api_client=self._api_client)
+                    update_collection,
+                    api_client=self._api_client,
+                    storage_classes_desired=self.storage_classes,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -889,8 +913,10 @@ class ArvPutUploadJob(object):
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
+                storage_classes_desired=(self.storage_classes or ['default']),
                 put_threads=self.put_threads,
-                api_client=self._api_client)
+                api_client=self._api_client,
+                num_retries=self.num_retries)
 
     def _cached_manifest_valid(self):
         """
@@ -982,6 +1008,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -1100,22 +1129,38 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     # Trash arguments validation
     trash_at = None
     if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
         try:
             trash_at = ciso8601.parse_datetime(args.trash_at)
         except:
-            logger.error("--trash-at argument format invalid, should be YYYY-MM-DDTHH:MM.")
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
             sys.exit(1)
         else:
             if trash_at.tzinfo is not None:
-                # Timezone-aware datetime provided, convert to non-aware UTC
-                delta = trash_at.tzinfo.utcoffset(None)
-                trash_at = trash_at.replace(tzinfo=None) - delta
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                if time.daylight:
+                    utcoffset = datetime.timedelta(seconds=time.altzone)
+                else:
+                    utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
         if trash_at <= datetime.datetime.utcnow():
-            logger.error("--trash-at argument should be set in the future")
+            logger.error("--trash-at argument must be set in the future")
             sys.exit(1)
     if args.trash_after is not None:
         if args.trash_after < 1:
-            logger.error("--trash-after argument should be >= 1")
+            logger.error("--trash-after argument must be >= 1")
             sys.exit(1)
         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
 
@@ -1156,11 +1201,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     #  Split storage-classes argument
     storage_classes = None
     if args.storage_classes:
-        storage_classes = args.storage_classes.strip().split(',')
-        if len(storage_classes) > 1:
-            logger.error("Multiple storage classes are not supported currently.")
-            sys.exit(1)
-
+        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
 
     # Setup exclude regex from all the --exclude arguments provided
     name_patterns = []
@@ -1260,7 +1301,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
-    except arvados.errors.ApiError as error:
+    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1275,12 +1316,23 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
-    else:
+    elif writer.manifest_locator() is not None:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                if time.daylight:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+                else:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
-                logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
-                logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -1290,6 +1342,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
+    else:
+        status = 1
 
     # Print the locator (uuid) of the new collection.
     if output is None: