11789: Merge branch 'master' into 11789-arvput-exclude-flag
[arvados.git] / sdk / python / arvados / commands / put.py
index 548f4b0948ae715393eb657a1693364e8b500639..cd559146146da82c82dfa39485f1567e1018d78d 100644 (file)
@@ -14,6 +14,7 @@ import copy
 import datetime
 import errno
 import fcntl
+import fnmatch
 import hashlib
 import json
 import logging
@@ -160,6 +161,16 @@ run_opts.add_argument('--name', help="""
 Save the collection with the specified name.
 """)
 
+run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where is placed.
+You can specify multiple patterns by using this argument more than once.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
@@ -248,6 +259,10 @@ def parse_arguments(arguments):
         if not args.filename:
             args.filename = 'stdin'
 
+    # Remove possible duplicated patterns
+    if len(args.exclude) > 0:
+        args.exclude = list(set(args.exclude))
+
     return args
 
 
@@ -376,18 +391,20 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 bytes_expected=None, name=None, owner_uuid=None,
+                 name=None, owner_uuid=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
-                 follow_links=True):
+                 follow_links=True, exclude_paths=[], exclude_names=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
-        self.bytes_expected = bytes_expected
+        # This will set to 0 before start counting, if no special files are going
+        # to be read.
+        self.bytes_expected = None
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
@@ -415,6 +432,8 @@ class ArvPutUploadJob(object):
         self.dry_run = dry_run
         self._checkpoint_before_quit = True
         self.follow_links = follow_links
+        self.exclude_paths = exclude_paths
+        self.exclude_names = exclude_names
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -426,55 +445,103 @@ class ArvPutUploadJob(object):
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
+        # Build the upload file list, excluding requested files and counting the
+        # bytes expected to be uploaded.
+        self._build_upload_list()
+
+    def _build_upload_list(self):
+        """
+        Scan the requested paths to count file sizes, excluding files & dirs if requested
+        and building the upload file list.
+        """
+        # If there aren't special files to be read, reset total bytes count to zero
+        # to start counting.
+        if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
+                          self.paths)):
+            self.bytes_expected = 0
+
+        for path in self.paths:
+            # Test for stdin first, in case some file named '-' exist
+            if path == '-':
+                if self.dry_run:
+                    raise ArvPutUploadIsPending()
+                self._write_stdin(self.filename or 'stdin')
+            elif not os.path.exists(path):
+                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+            elif os.path.isdir(path):
+                # Use absolute paths on cache index so CWD doesn't interfere
+                # with the caching logic.
+                orig_path = path
+                path = os.path.abspath(path)
+                if orig_path[-1:] == os.sep:
+                    # When passing a directory reference with a trailing slash,
+                    # its contents should be uploaded directly to the
+                    # collection's root.
+                    prefixdir = path
+                else:
+                    # When passing a directory reference with no trailing slash,
+                    # upload the directory to the collection's root.
+                    prefixdir = os.path.dirname(path)
+                prefixdir += os.sep
+                for root, dirs, files in os.walk(path,
+                                                 followlinks=self.follow_links):
+                    root_relpath = os.path.relpath(root, path)
+                    if root_relpath == '.':
+                        root_relpath = ''
+                    # Exclude files/dirs by full path matching pattern
+                    if self.exclude_paths:
+                        dirs[:] = list(filter(
+                            lambda d: not any(
+                                [pathname_match(os.path.join(root_relpath, d),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            dirs))
+                        files = list(filter(
+                            lambda f: not any(
+                                [pathname_match(os.path.join(root_relpath, f),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            files))
+                    # Exclude files/dirs by name matching pattern
+                    if self.exclude_names is not None:
+                        dirs[:] = list(filter(lambda d: not self.exclude_names.match(d), dirs))
+                        files = list(filter(lambda f: not self.exclude_names.match(f), files))
+                    # Make os.walk()'s dir traversing order deterministic
+                    dirs.sort()
+                    files.sort()
+                    for f in files:
+                        filepath = os.path.join(root, f)
+                        # Add its size to the total bytes count (if applicable)
+                        if self.follow_links or (not os.path.islink(filepath)):
+                            if self.bytes_expected is not None:
+                                self.bytes_expected += os.path.getsize(filepath)
+                        self._check_file(filepath,
+                                         os.path.join(root[len(prefixdir):], f))
+            else:
+                filepath = os.path.abspath(path)
+                # Add its size to the total bytes count (if applicable)
+                if self.follow_links or (not os.path.islink(filepath)):
+                    if self.bytes_expected is not None:
+                        self.bytes_expected += os.path.getsize(filepath)
+                self._check_file(filepath,
+                                 self.filename or os.path.basename(path))
+        # If dry-mode is on, and got up to this point, then we should notify that
+        # there aren't any file to upload.
+        if self.dry_run:
+            raise ArvPutUploadNotPending()
+        # Remove local_collection's files that don't exist locally anymore, so the
+        # bytes_written count is correct.
+        for f in self.collection_file_paths(self._local_collection,
+                                            path_prefix=""):
+            if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                self._local_collection.remove(f)
+
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        if not self.dry_run:
-            self._checkpointer.start()
+        self._checkpointer.start()
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    if self.dry_run:
-                        raise ArvPutUploadIsPending()
-                    self._write_stdin(self.filename or 'stdin')
-                elif not os.path.exists(path):
-                     raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
-                elif os.path.isdir(path):
-                    # Use absolute paths on cache index so CWD doesn't interfere
-                    # with the caching logic.
-                    orig_path = path
-                    path = os.path.abspath(path)
-                    if orig_path[-1:] == os.sep:
-                        # When passing a directory reference with a trailing slash,
-                        # its contents should be uploaded directly to the collection's root.
-                        prefixdir = path
-                    else:
-                        # When passing a directory reference with no trailing slash,
-                        # upload the directory to the collection's root.
-                        prefixdir = os.path.dirname(path)
-                    prefixdir += os.sep
-                    for root, dirs, files in os.walk(path, followlinks=self.follow_links):
-                        # Make os.walk()'s dir traversing order deterministic
-                        dirs.sort()
-                        files.sort()
-                        for f in files:
-                            self._check_file(os.path.join(root, f),
-                                             os.path.join(root[len(prefixdir):], f))
-                else:
-                    self._check_file(os.path.abspath(path),
-                                     self.filename or os.path.basename(path))
-            # If dry-mode is on, and got up to this point, then we should notify that
-            # there aren't any file to upload.
-            if self.dry_run:
-                raise ArvPutUploadNotPending()
-            # Remove local_collection's files that don't exist locally anymore, so the
-            # bytes_written count is correct.
-            for f in self.collection_file_paths(self._local_collection,
-                                                path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in self._file_paths:
-                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
@@ -660,7 +727,13 @@ class ArvPutUploadJob(object):
             should_upload = True
 
         if should_upload:
-            self._files_to_upload.append((source, resume_offset, filename))
+            try:
+                self._files_to_upload.append((source, resume_offset, filename))
+            except ArvPutUploadIsPending:
+                # This could happen when running on dry-mode, close cache file to
+                # avoid locking issues.
+                self._cache_file.close()
+                raise
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
@@ -839,29 +912,24 @@ class ArvPutUploadJob(object):
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
-
-def expected_bytes_for(pathlist, follow_links=True):
-    # Walk the given directory trees and stat files, adding up file sizes,
-    # so we can display progress as percent
-    bytesum = 0
-    for path in pathlist:
-        if os.path.isdir(path):
-            for root, dirs, files in os.walk(path, followlinks=follow_links):
-                # Sum file sizes
-                for f in files:
-                    filepath = os.path.join(root, f)
-                    # Ignore symlinked files when requested
-                    if (not follow_links) and os.path.islink(filepath):
-                        continue
-                    bytesum += os.path.getsize(filepath)
-        elif not os.path.isfile(path):
-            return None
-        else:
-            bytesum += os.path.getsize(path)
-    return bytesum
-
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
+
+# Simulate glob.glob() matching behavior without the need to scan the filesystem
+# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
+# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
+# so instead we're using it on every path component.
+def pathname_match(pathname, pattern):
+    name = pathname.split(os.sep)
+    # Fix patterns like 'some/subdir/' or 'some//subdir'
+    pat = [x for x in pattern.split(os.sep) if x != '']
+    if len(name) != len(pat):
+        return False
+    for i in range(len(name)):
+        if not fnmatch.fnmatch(name[i], pat[i]):
+            return False
+    return True
+
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
@@ -937,19 +1005,53 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
+    # Setup exclude regex from all the --exclude arguments provided
+    name_patterns = []
+    exclude_paths = []
+    exclude_names = None
+    if len(args.exclude) > 0:
+        # We're supporting 2 kinds of exclusion patterns:
+        # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
+        #                            the name)
+        # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
+        #                            entire path, and should be relative to
+        #                            any input dir argument)
+        for p in args.exclude:
+            # Only relative paths patterns allowed
+            if p.startswith(os.sep):
+                logger.error("Cannot use absolute paths with --exclude")
+                sys.exit(1)
+            if os.path.dirname(p):
+                # We don't support of path patterns with '.' or '..'
+                p_parts = p.split(os.sep)
+                if '.' in p_parts or '..' in p_parts:
+                    logger.error(
+                        "Cannot use path patterns that include '.' or '..'")
+                    sys.exit(1)
+                # Path search pattern
+                exclude_paths.append(p)
+            else:
+                # Name-only search pattern
+                name_patterns.append(p)
+        # For name only matching, we can combine all patterns into a single regexp,
+        # for better performance.
+        exclude_names = re.compile('|'.join(
+            [fnmatch.translate(p) for p in name_patterns]
+        )) if len(name_patterns) > 0 else None
+        # Show the user the patterns to be used, just in case they weren't specified inside
+        # quotes and got changed by the shell expansion.
+        logger.info("Exclude patterns: {}".format(args.exclude))
+
     # If this is used by a human, and there's at least one directory to be
     # uploaded, the expected bytes calculation can take a moment.
     if args.progress and any([os.path.isdir(f) for f in args.paths]):
         logger.info("Calculating upload size, this could take some time...")
-    bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
-
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
-                                 bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -959,7 +1061,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  update_collection = args.update_collection,
                                  logger=logger,
                                  dry_run=args.dry_run,
-                                 follow_links=args.follow_links)
+                                 follow_links=args.follow_links,
+                                 exclude_paths=exclude_paths,
+                                 exclude_names=exclude_names)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -975,6 +1079,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
+    except PathDoesNotExistError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -995,16 +1103,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
-    except ArvPutUploadIsPending:
-        # Dry run check successful, return proper exit code.
-        sys.exit(2)
-    except ArvPutUploadNotPending:
-        # No files pending for upload
-        sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")