11789: Converting a filter() iterable to list, for python3 compatibility.
[arvados.git] / sdk / python / arvados / commands / put.py
index 5b46ba75b70d864589f681f1619500def41781d5..a0f64e1de4a7fa7e2bfd88f4d61c6e074e834d6f 100644 (file)
@@ -1,8 +1,7 @@
-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
+from __future__ import division
+from future.utils import listitems, listvalues
+from builtins import str
+from builtins import object
 import argparse
 import arvados
 import arvados.collection
@@ -11,6 +10,7 @@ import copy
 import datetime
 import errno
 import fcntl
+import fnmatch
 import hashlib
 import json
 import logging
@@ -23,6 +23,8 @@ import sys
 import tempfile
 import threading
 import time
+import traceback
+
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
@@ -38,7 +40,9 @@ upload_opts.add_argument('--version', action='version',
                          help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
-Local file or directory. Default: read from standard input.
+Local file or directory. If path is a directory reference with a trailing
+slash, then just upload the directory's contents; otherwise upload the
+directory itself. Default: read from standard input.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
@@ -153,6 +157,16 @@ run_opts.add_argument('--name', help="""
 Save the collection with the specified name.
 """)
 
+run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where is placed.
+You can specify multiple patterns by using this argument more than once.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
@@ -183,6 +197,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Do not follow file and directory symlinks.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                     help="""
@@ -203,7 +227,7 @@ def parse_arguments(arguments):
     if len(args.paths) == 0:
         args.paths = ['-']
 
-    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
+    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
@@ -231,9 +255,17 @@ def parse_arguments(arguments):
         if not args.filename:
             args.filename = 'stdin'
 
+    # Remove possible duplicated patterns
+    if len(args.exclude) > 0:
+        args.exclude = list(set(args.exclude))
+
     return args
 
 
+class PathDoesNotExistError(Exception):
+    pass
+
+
 class CollectionUpdateError(Exception):
     pass
 
@@ -276,13 +308,13 @@ class ResumeCache(object):
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
-        md5.update('\0'.join(realpaths))
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update("-1")
+            md5.update(b'-1')
         elif args.filename:
-            md5.update(args.filename)
+            md5.update(args.filename.encode())
         return os.path.join(
             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
             md5.hexdigest())
@@ -355,17 +387,20 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 bytes_expected=None, name=None, owner_uuid=None,
+                 name=None, owner_uuid=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
-                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+                 follow_links=True, exclude_paths=[], exclude_names=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
-        self.bytes_expected = bytes_expected
+        # This will set to 0 before start counting, if no special files are going
+        # to be read.
+        self.bytes_expected = None
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
@@ -382,14 +417,19 @@ class ArvPutUploadJob(object):
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
+        self._checkpoint_before_quit = True
+        self.follow_links = follow_links
+        self.exclude_paths = exclude_paths
+        self.exclude_names = exclude_names
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -401,60 +441,134 @@ class ArvPutUploadJob(object):
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
+        # Build the upload file list, excluding requested files and counting the
+        # bytes expected to be uploaded.
+        self._build_upload_list()
+
+    def _build_upload_list(self):
+        """
+        Scan the requested paths to count file sizes, excluding files & dirs if requested
+        and building the upload file list.
+        """
+        # If there aren't special files to be read, reset total bytes count to zero
+        # to start counting.
+        if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
+                          self.paths)):
+            self.bytes_expected = 0
+
+        for path in self.paths:
+            # Test for stdin first, in case some file named '-' exist
+            if path == '-':
+                if self.dry_run:
+                    raise ArvPutUploadIsPending()
+                self._write_stdin(self.filename or 'stdin')
+            elif not os.path.exists(path):
+                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+            elif os.path.isdir(path):
+                # Use absolute paths on cache index so CWD doesn't interfere
+                # with the caching logic.
+                orig_path = path
+                path = os.path.abspath(path)
+                if orig_path[-1:] == os.sep:
+                    # When passing a directory reference with a trailing slash,
+                    # its contents should be uploaded directly to the
+                    # collection's root.
+                    prefixdir = path
+                else:
+                    # When passing a directory reference with no trailing slash,
+                    # upload the directory to the collection's root.
+                    prefixdir = os.path.dirname(path)
+                prefixdir += os.sep
+                for root, dirs, files in os.walk(path,
+                                                 followlinks=self.follow_links):
+                    root_relpath = os.path.relpath(root, path)
+                    if root_relpath == '.':
+                        root_relpath = ''
+                    # Exclude files/dirs by full path matching pattern
+                    if self.exclude_paths:
+                        dirs[:] = list(filter(
+                            lambda d: not any(
+                                [pathname_match(os.path.join(root_relpath, d),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            dirs))
+                        files = list(filter(
+                            lambda f: not any(
+                                [pathname_match(os.path.join(root_relpath, f),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            files))
+                    # Exclude files/dirs by name matching pattern
+                    if self.exclude_names is not None:
+                        dirs[:] = list(filter(lambda d: not self.exclude_names.match(d), dirs))
+                        files = list(filter(lambda f: not self.exclude_names.match(f), files))
+                    # Make os.walk()'s dir traversing order deterministic
+                    dirs.sort()
+                    files.sort()
+                    for f in files:
+                        filepath = os.path.join(root, f)
+                        # Add its size to the total bytes count (if applicable)
+                        if self.follow_links or (not os.path.islink(filepath)):
+                            if self.bytes_expected is not None:
+                                self.bytes_expected += os.path.getsize(filepath)
+                        self._check_file(filepath,
+                                         os.path.join(root[len(prefixdir):], f))
+            else:
+                filepath = os.path.abspath(path)
+                # Add its size to the total bytes count (if applicable)
+                if self.follow_links or (not os.path.islink(filepath)):
+                    if self.bytes_expected is not None:
+                        self.bytes_expected += os.path.getsize(filepath)
+                self._check_file(filepath,
+                                 self.filename or os.path.basename(path))
+        # If dry-mode is on, and got up to this point, then we should notify that
+        # there aren't any file to upload.
+        if self.dry_run:
+            raise ArvPutUploadNotPending()
+        # Remove local_collection's files that don't exist locally anymore, so the
+        # bytes_written count is correct.
+        for f in self.collection_file_paths(self._local_collection,
+                                            path_prefix=""):
+            if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                self._local_collection.remove(f)
+
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        if not self.dry_run:
-            self._checkpointer.start()
+        self._checkpointer.start()
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    if self.dry_run:
-                        raise ArvPutUploadIsPending()
-                    self._write_stdin(self.filename or 'stdin')
-                elif os.path.isdir(path):
-                    # Use absolute paths on cache index so CWD doesn't interfere
-                    # with the caching logic.
-                    prefixdir = path = os.path.abspath(path)
-                    if prefixdir != '/':
-                        prefixdir += '/'
-                    for root, dirs, files in os.walk(path):
-                        # Make os.walk()'s dir traversing order deterministic
-                        dirs.sort()
-                        files.sort()
-                        for f in files:
-                            self._check_file(os.path.join(root, f),
-                                             os.path.join(root[len(prefixdir):], f))
-                else:
-                    self._check_file(os.path.abspath(path),
-                                     self.filename or os.path.basename(path))
-            # If dry-mode is on, and got up to this point, then we should notify that
-            # there aren't any file to upload.
-            if self.dry_run:
-                raise ArvPutUploadNotPending()
-            # Remove local_collection's files that don't exist locally anymore, so the
-            # bytes_written count is correct.
-            for f in self.collection_file_paths(self._local_collection,
-                                                path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in self._file_paths:
-                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
+        except (SystemExit, Exception) as e:
+            self._checkpoint_before_quit = False
+            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
+            # Note: We're expecting SystemExit instead of
+            # KeyboardInterrupt because we have a custom signal
+            # handler in place that raises SystemExit with the catched
+            # signal's code.
+            if isinstance(e, PathDoesNotExistError):
+                # We aren't interested in the traceback for this case
+                pass
+            elif not isinstance(e, SystemExit) or e.code != -2:
+                self.logger.warning("Abnormal termination:\n{}".format(
+                    traceback.format_exc()))
+            raise
         finally:
             if not self.dry_run:
                 # Stop the thread before doing anything else
                 self._stop_checkpointer.set()
                 self._checkpointer.join()
-                # Commit all pending blocks & one last _update()
-                self._local_collection.manifest_text()
-                self._update(final=True)
-                if save_collection:
-                    self.save_collection()
+                if self._checkpoint_before_quit:
+                    # Commit all pending blocks & one last _update()
+                    self._local_collection.manifest_text()
+                    self._update(final=True)
+                    if save_collection:
+                        self.save_collection()
             if self.use_cache:
                 self._cache_file.close()
 
@@ -494,7 +608,7 @@ class ArvPutUploadJob(object):
         Recursively get the total size of the collection
         """
         size = 0
-        for item in collection.values():
+        for item in listvalues(collection):
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
@@ -506,28 +620,34 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
             if self.use_cache:
-                if final:
-                    manifest = self._local_collection.manifest_text()
-                else:
-                    # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(strip=False,
-                                                                    normalize=False,
-                                                                    only_committed=True)
-                # Update cache
-                with self._state_lock:
-                    self._state['manifest'] = manifest
-        if self.use_cache:
-            self._save_state()
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
@@ -536,17 +656,22 @@ class ArvPutUploadJob(object):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
-        output = self._local_collection.open(filename, 'w')
+        output = self._local_collection.open(filename, 'wb')
         self._write(sys.stdin, output)
         output.close()
 
     def _check_file(self, source, filename):
-        """Check if this file needs to be uploaded"""
+        """
+        Check if this file needs to be uploaded
+        """
+        # Ignore symlinks when requested
+        if (not self.follow_links) and os.path.islink(source):
+            return
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -598,21 +723,27 @@ class ArvPutUploadJob(object):
             should_upload = True
 
         if should_upload:
-            self._files_to_upload.append((source, resume_offset, filename))
+            try:
+                self._files_to_upload.append((source, resume_offset, filename))
+            except ArvPutUploadIsPending:
+                # This could happen when running on dry-mode, close cache file to
+                # avoid locking issues.
+                self._cache_file.close()
+                raise
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
-            with open(source, 'r') as source_fd:
+            with open(source, 'rb') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
-                    output = self._local_collection.open(filename, 'a')
+                    output = self._local_collection.open(filename, 'ab')
                     source_fd.seek(resume_offset)
                 else:
                     # Start from scratch
-                    output = self._local_collection.open(filename, 'w')
+                    output = self._local_collection.open(filename, 'wb')
                 self._write(source_fd, output)
                 output.close(flush=False)
 
@@ -646,19 +777,21 @@ class ArvPutUploadJob(object):
         if self.use_cache:
             # Set up cache file name from input paths.
             md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
+            md5.update(b'\0'.join([p.encode() for p in realpaths]))
             if self.filename:
-                md5.update(self.filename)
+                md5.update(self.filename.encode())
             cache_filename = md5.hexdigest()
             cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 cache_filename)
-            if self.resume:
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -675,6 +808,7 @@ class ArvPutUploadJob(object):
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
+                self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
@@ -683,7 +817,7 @@ class ArvPutUploadJob(object):
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
-        for name, item in col.items():
+        for name, item in listitems(col):
             if isinstance(item, arvados.arvfile.ArvadosFile):
                 file_paths.append(os.path.join(path_prefix, name))
             elif isinstance(item, arvados.collection.Subcollection):
@@ -701,17 +835,20 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
-            with self._state_lock:
-                state = copy.deepcopy(self._state)
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache = tempfile.NamedTemporaryFile(
+                mode='w+',
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
@@ -729,7 +866,14 @@ class ArvPutUploadJob(object):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        return self._my_collection().portable_data_hash()
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest().encode()
+        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
@@ -750,7 +894,7 @@ class ArvPutUploadJob(object):
                     locators.append(loc)
                 return locators
         elif isinstance(item, arvados.collection.Collection):
-            l = [self._datablocks_on_item(x) for x in item.values()]
+            l = [self._datablocks_on_item(x) for x in listvalues(item)]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
@@ -764,23 +908,24 @@ class ArvPutUploadJob(object):
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
-
-def expected_bytes_for(pathlist):
-    # Walk the given directory trees and stat files, adding up file sizes,
-    # so we can display progress as percent
-    bytesum = 0
-    for path in pathlist:
-        if os.path.isdir(path):
-            for filename in arvados.util.listdir_recursive(path):
-                bytesum += os.path.getsize(os.path.join(path, filename))
-        elif not os.path.isfile(path):
-            return None
-        else:
-            bytesum += os.path.getsize(path)
-    return bytesum
-
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
+
+# Simulate glob.glob() matching behavior without the need to scan the filesystem
+# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
+# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
+# so instead we're using it on every path component.
+def pathname_match(pathname, pattern):
+    name = pathname.split(os.sep)
+    # Fix patterns like 'some/subdir/' or 'some//subdir'
+    pat = [x for x in pattern.split(os.sep) if x != '']
+    if len(name) != len(pat):
+        return False
+    for i in range(len(name)):
+        if not fnmatch.fnmatch(name[i], pat[i]):
+            return False
+    return True
+
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
@@ -816,6 +961,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -855,15 +1001,53 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
-    bytes_expected = expected_bytes_for(args.paths)
-
+    # Setup exclude regex from all the --exclude arguments provided
+    name_patterns = []
+    exclude_paths = []
+    exclude_names = None
+    if len(args.exclude) > 0:
+        # We're supporting 2 kinds of exclusion patterns:
+        # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
+        #                            the name)
+        # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
+        #                            entire path, and should be relative to
+        #                            any input dir argument)
+        for p in args.exclude:
+            # Only relative paths patterns allowed
+            if p.startswith(os.sep):
+                logger.error("Cannot use absolute paths with --exclude")
+                sys.exit(1)
+            if os.path.dirname(p):
+                # We don't support of path patterns with '.' or '..'
+                p_parts = p.split(os.sep)
+                if '.' in p_parts or '..' in p_parts:
+                    logger.error(
+                        "Cannot use path patterns that include '.' or '..'")
+                    sys.exit(1)
+                # Path search pattern
+                exclude_paths.append(p)
+            else:
+                # Name-only search pattern
+                name_patterns.append(p)
+        # For name only matching, we can combine all patterns into a single regexp,
+        # for better performance.
+        exclude_names = re.compile('|'.join(
+            [fnmatch.translate(p) for p in name_patterns]
+        )) if len(name_patterns) > 0 else None
+        # Show the user the patterns to be used, just in case they weren't specified inside
+        # quotes and got changed by the shell expansion.
+        logger.info("Exclude patterns: {}".format(args.exclude))
+
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
-                                 bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -872,7 +1056,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
                                  logger=logger,
-                                 dry_run=args.dry_run)
+                                 dry_run=args.dry_run,
+                                 follow_links=args.follow_links,
+                                 exclude_paths=exclude_paths,
+                                 exclude_names=exclude_names)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -888,6 +1075,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
+    except PathDoesNotExistError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -908,12 +1099,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
-    except ArvPutUploadIsPending:
-        # Dry run check successful, return proper exit code.
-        sys.exit(2)
-    except ArvPutUploadNotPending:
-        # No files pending for upload
-        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
@@ -949,7 +1134,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if not output.endswith('\n'):
             stdout.write('\n')
 
-    for sigcode, orig_handler in orig_signal_handlers.items():
+    for sigcode, orig_handler in listitems(orig_signal_handlers):
         signal.signal(sigcode, orig_handler)
 
     if status != 0: