11789: Converting a filter() iterable to list, for python3 compatibility.
[arvados.git] / sdk / python / arvados / commands / put.py
index 5b46ba75b70d864589f681f1619500def41781d5..a0f64e1de4a7fa7e2bfd88f4d61c6e074e834d6f 100644 (file)
@@ -1,8 +1,7 @@
-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
+from __future__ import division
+from future.utils import listitems, listvalues
+from builtins import str
+from builtins import object
 import argparse
 import arvados
 import arvados.collection
 import argparse
 import arvados
 import arvados.collection
@@ -11,6 +10,7 @@ import copy
 import datetime
 import errno
 import fcntl
 import datetime
 import errno
 import fcntl
+import fnmatch
 import hashlib
 import json
 import logging
 import hashlib
 import json
 import logging
@@ -23,6 +23,8 @@ import sys
 import tempfile
 import threading
 import time
 import tempfile
 import threading
 import time
+import traceback
+
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
@@ -38,7 +40,9 @@ upload_opts.add_argument('--version', action='version',
                          help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
                          help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
-Local file or directory. Default: read from standard input.
+Local file or directory. If path is a directory reference with a trailing
+slash, then just upload the directory's contents; otherwise upload the
+directory itself. Default: read from standard input.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
@@ -153,6 +157,16 @@ run_opts.add_argument('--name', help="""
 Save the collection with the specified name.
 """)
 
 Save the collection with the specified name.
 """)
 
+run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where is placed.
+You can specify multiple patterns by using this argument more than once.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
@@ -183,6 +197,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Do not follow file and directory symlinks.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                     help="""
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                     help="""
@@ -203,7 +227,7 @@ def parse_arguments(arguments):
     if len(args.paths) == 0:
         args.paths = ['-']
 
     if len(args.paths) == 0:
         args.paths = ['-']
 
-    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
+    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
@@ -231,9 +255,17 @@ def parse_arguments(arguments):
         if not args.filename:
             args.filename = 'stdin'
 
         if not args.filename:
             args.filename = 'stdin'
 
+    # Remove possible duplicated patterns
+    if len(args.exclude) > 0:
+        args.exclude = list(set(args.exclude))
+
     return args
 
 
     return args
 
 
+class PathDoesNotExistError(Exception):
+    pass
+
+
 class CollectionUpdateError(Exception):
     pass
 
 class CollectionUpdateError(Exception):
     pass
 
@@ -276,13 +308,13 @@ class ResumeCache(object):
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
-        md5.update('\0'.join(realpaths))
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
         if any(os.path.isdir(path) for path in realpaths):
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update("-1")
+            md5.update(b'-1')
         elif args.filename:
         elif args.filename:
-            md5.update(args.filename)
+            md5.update(args.filename.encode())
         return os.path.join(
             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
             md5.hexdigest())
         return os.path.join(
             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
             md5.hexdigest())
@@ -355,17 +387,20 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 bytes_expected=None, name=None, owner_uuid=None,
+                 name=None, owner_uuid=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
-                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+                 follow_links=True, exclude_paths=[], exclude_names=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
-        self.bytes_expected = bytes_expected
+        # This will set to 0 before start counting, if no special files are going
+        # to be read.
+        self.bytes_expected = None
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
@@ -382,14 +417,19 @@ class ArvPutUploadJob(object):
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
         self.logger = logger
         self.dry_run = dry_run
+        self._checkpoint_before_quit = True
+        self.follow_links = follow_links
+        self.exclude_paths = exclude_paths
+        self.exclude_names = exclude_names
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -401,60 +441,134 @@ class ArvPutUploadJob(object):
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
+        # Build the upload file list, excluding requested files and counting the
+        # bytes expected to be uploaded.
+        self._build_upload_list()
+
+    def _build_upload_list(self):
+        """
+        Scan the requested paths to count file sizes, excluding files & dirs if requested
+        and building the upload file list.
+        """
+        # If there aren't special files to be read, reset total bytes count to zero
+        # to start counting.
+        if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
+                          self.paths)):
+            self.bytes_expected = 0
+
+        for path in self.paths:
+            # Test for stdin first, in case some file named '-' exist
+            if path == '-':
+                if self.dry_run:
+                    raise ArvPutUploadIsPending()
+                self._write_stdin(self.filename or 'stdin')
+            elif not os.path.exists(path):
+                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+            elif os.path.isdir(path):
+                # Use absolute paths on cache index so CWD doesn't interfere
+                # with the caching logic.
+                orig_path = path
+                path = os.path.abspath(path)
+                if orig_path[-1:] == os.sep:
+                    # When passing a directory reference with a trailing slash,
+                    # its contents should be uploaded directly to the
+                    # collection's root.
+                    prefixdir = path
+                else:
+                    # When passing a directory reference with no trailing slash,
+                    # upload the directory to the collection's root.
+                    prefixdir = os.path.dirname(path)
+                prefixdir += os.sep
+                for root, dirs, files in os.walk(path,
+                                                 followlinks=self.follow_links):
+                    root_relpath = os.path.relpath(root, path)
+                    if root_relpath == '.':
+                        root_relpath = ''
+                    # Exclude files/dirs by full path matching pattern
+                    if self.exclude_paths:
+                        dirs[:] = list(filter(
+                            lambda d: not any(
+                                [pathname_match(os.path.join(root_relpath, d),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            dirs))
+                        files = list(filter(
+                            lambda f: not any(
+                                [pathname_match(os.path.join(root_relpath, f),
+                                                pat)
+                                 for pat in self.exclude_paths]),
+                            files))
+                    # Exclude files/dirs by name matching pattern
+                    if self.exclude_names is not None:
+                        dirs[:] = list(filter(lambda d: not self.exclude_names.match(d), dirs))
+                        files = list(filter(lambda f: not self.exclude_names.match(f), files))
+                    # Make os.walk()'s dir traversing order deterministic
+                    dirs.sort()
+                    files.sort()
+                    for f in files:
+                        filepath = os.path.join(root, f)
+                        # Add its size to the total bytes count (if applicable)
+                        if self.follow_links or (not os.path.islink(filepath)):
+                            if self.bytes_expected is not None:
+                                self.bytes_expected += os.path.getsize(filepath)
+                        self._check_file(filepath,
+                                         os.path.join(root[len(prefixdir):], f))
+            else:
+                filepath = os.path.abspath(path)
+                # Add its size to the total bytes count (if applicable)
+                if self.follow_links or (not os.path.islink(filepath)):
+                    if self.bytes_expected is not None:
+                        self.bytes_expected += os.path.getsize(filepath)
+                self._check_file(filepath,
+                                 self.filename or os.path.basename(path))
+        # If dry-mode is on, and got up to this point, then we should notify that
+        # there aren't any file to upload.
+        if self.dry_run:
+            raise ArvPutUploadNotPending()
+        # Remove local_collection's files that don't exist locally anymore, so the
+        # bytes_written count is correct.
+        for f in self.collection_file_paths(self._local_collection,
+                                            path_prefix=""):
+            if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                self._local_collection.remove(f)
+
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        if not self.dry_run:
-            self._checkpointer.start()
+        self._checkpointer.start()
         try:
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    if self.dry_run:
-                        raise ArvPutUploadIsPending()
-                    self._write_stdin(self.filename or 'stdin')
-                elif os.path.isdir(path):
-                    # Use absolute paths on cache index so CWD doesn't interfere
-                    # with the caching logic.
-                    prefixdir = path = os.path.abspath(path)
-                    if prefixdir != '/':
-                        prefixdir += '/'
-                    for root, dirs, files in os.walk(path):
-                        # Make os.walk()'s dir traversing order deterministic
-                        dirs.sort()
-                        files.sort()
-                        for f in files:
-                            self._check_file(os.path.join(root, f),
-                                             os.path.join(root[len(prefixdir):], f))
-                else:
-                    self._check_file(os.path.abspath(path),
-                                     self.filename or os.path.basename(path))
-            # If dry-mode is on, and got up to this point, then we should notify that
-            # there aren't any file to upload.
-            if self.dry_run:
-                raise ArvPutUploadNotPending()
-            # Remove local_collection's files that don't exist locally anymore, so the
-            # bytes_written count is correct.
-            for f in self.collection_file_paths(self._local_collection,
-                                                path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in self._file_paths:
-                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
             self._upload_files()
+        except (SystemExit, Exception) as e:
+            self._checkpoint_before_quit = False
+            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
+            # Note: We're expecting SystemExit instead of
+            # KeyboardInterrupt because we have a custom signal
+            # handler in place that raises SystemExit with the catched
+            # signal's code.
+            if isinstance(e, PathDoesNotExistError):
+                # We aren't interested in the traceback for this case
+                pass
+            elif not isinstance(e, SystemExit) or e.code != -2:
+                self.logger.warning("Abnormal termination:\n{}".format(
+                    traceback.format_exc()))
+            raise
         finally:
             if not self.dry_run:
                 # Stop the thread before doing anything else
                 self._stop_checkpointer.set()
                 self._checkpointer.join()
         finally:
             if not self.dry_run:
                 # Stop the thread before doing anything else
                 self._stop_checkpointer.set()
                 self._checkpointer.join()
-                # Commit all pending blocks & one last _update()
-                self._local_collection.manifest_text()
-                self._update(final=True)
-                if save_collection:
-                    self.save_collection()
+                if self._checkpoint_before_quit:
+                    # Commit all pending blocks & one last _update()
+                    self._local_collection.manifest_text()
+                    self._update(final=True)
+                    if save_collection:
+                        self.save_collection()
             if self.use_cache:
                 self._cache_file.close()
 
             if self.use_cache:
                 self._cache_file.close()
 
@@ -494,7 +608,7 @@ class ArvPutUploadJob(object):
         Recursively get the total size of the collection
         """
         size = 0
         Recursively get the total size of the collection
         """
         size = 0
-        for item in collection.values():
+        for item in listvalues(collection):
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
@@ -506,28 +620,34 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
             if self.use_cache:
             if self.use_cache:
-                if final:
-                    manifest = self._local_collection.manifest_text()
-                else:
-                    # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(strip=False,
-                                                                    normalize=False,
-                                                                    only_committed=True)
-                # Update cache
-                with self._state_lock:
-                    self._state['manifest'] = manifest
-        if self.use_cache:
-            self._save_state()
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -536,17 +656,22 @@ class ArvPutUploadJob(object):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
-        output = self._local_collection.open(filename, 'w')
+        output = self._local_collection.open(filename, 'wb')
         self._write(sys.stdin, output)
         output.close()
 
     def _check_file(self, source, filename):
         self._write(sys.stdin, output)
         output.close()
 
     def _check_file(self, source, filename):
-        """Check if this file needs to be uploaded"""
+        """
+        Check if this file needs to be uploaded
+        """
+        # Ignore symlinks when requested
+        if (not self.follow_links) and os.path.islink(source):
+            return
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -598,21 +723,27 @@ class ArvPutUploadJob(object):
             should_upload = True
 
         if should_upload:
             should_upload = True
 
         if should_upload:
-            self._files_to_upload.append((source, resume_offset, filename))
+            try:
+                self._files_to_upload.append((source, resume_offset, filename))
+            except ArvPutUploadIsPending:
+                # This could happen when running on dry-mode, close cache file to
+                # avoid locking issues.
+                self._cache_file.close()
+                raise
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
-            with open(source, 'r') as source_fd:
+            with open(source, 'rb') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
-                    output = self._local_collection.open(filename, 'a')
+                    output = self._local_collection.open(filename, 'ab')
                     source_fd.seek(resume_offset)
                 else:
                     # Start from scratch
                     source_fd.seek(resume_offset)
                 else:
                     # Start from scratch
-                    output = self._local_collection.open(filename, 'w')
+                    output = self._local_collection.open(filename, 'wb')
                 self._write(source_fd, output)
                 output.close(flush=False)
 
                 self._write(source_fd, output)
                 output.close(flush=False)
 
@@ -646,19 +777,21 @@ class ArvPutUploadJob(object):
         if self.use_cache:
             # Set up cache file name from input paths.
             md5 = hashlib.md5()
         if self.use_cache:
             # Set up cache file name from input paths.
             md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
+            md5.update(b'\0'.join([p.encode() for p in realpaths]))
             if self.filename:
             if self.filename:
-                md5.update(self.filename)
+                md5.update(self.filename.encode())
             cache_filename = md5.hexdigest()
             cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 cache_filename)
             cache_filename = md5.hexdigest()
             cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 cache_filename)
-            if self.resume:
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -675,6 +808,7 @@ class ArvPutUploadJob(object):
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
+                self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
@@ -683,7 +817,7 @@ class ArvPutUploadJob(object):
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
-        for name, item in col.items():
+        for name, item in listitems(col):
             if isinstance(item, arvados.arvfile.ArvadosFile):
                 file_paths.append(os.path.join(path_prefix, name))
             elif isinstance(item, arvados.collection.Subcollection):
             if isinstance(item, arvados.arvfile.ArvadosFile):
                 file_paths.append(os.path.join(path_prefix, name))
             elif isinstance(item, arvados.collection.Subcollection):
@@ -701,17 +835,20 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
         try:
-            with self._state_lock:
-                state = copy.deepcopy(self._state)
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache = tempfile.NamedTemporaryFile(
+                mode='w+',
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
@@ -729,7 +866,14 @@ class ArvPutUploadJob(object):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        return self._my_collection().portable_data_hash()
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest().encode()
+        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
@@ -750,7 +894,7 @@ class ArvPutUploadJob(object):
                     locators.append(loc)
                 return locators
         elif isinstance(item, arvados.collection.Collection):
                     locators.append(loc)
                 return locators
         elif isinstance(item, arvados.collection.Collection):
-            l = [self._datablocks_on_item(x) for x in item.values()]
+            l = [self._datablocks_on_item(x) for x in listvalues(item)]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
@@ -764,23 +908,24 @@ class ArvPutUploadJob(object):
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
-
-def expected_bytes_for(pathlist):
-    # Walk the given directory trees and stat files, adding up file sizes,
-    # so we can display progress as percent
-    bytesum = 0
-    for path in pathlist:
-        if os.path.isdir(path):
-            for filename in arvados.util.listdir_recursive(path):
-                bytesum += os.path.getsize(os.path.join(path, filename))
-        elif not os.path.isfile(path):
-            return None
-        else:
-            bytesum += os.path.getsize(path)
-    return bytesum
-
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
+
+# Simulate glob.glob() matching behavior without the need to scan the filesystem
+# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
+# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
+# so instead we're using it on every path component.
+def pathname_match(pathname, pattern):
+    name = pathname.split(os.sep)
+    # Fix patterns like 'some/subdir/' or 'some//subdir'
+    pat = [x for x in pattern.split(os.sep) if x != '']
+    if len(name) != len(pat):
+        return False
+    for i in range(len(name)):
+        if not fnmatch.fnmatch(name[i], pat[i]):
+            return False
+    return True
+
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
@@ -816,6 +961,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -855,15 +1001,53 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
-    bytes_expected = expected_bytes_for(args.paths)
-
+    # Setup exclude regex from all the --exclude arguments provided
+    name_patterns = []
+    exclude_paths = []
+    exclude_names = None
+    if len(args.exclude) > 0:
+        # We're supporting 2 kinds of exclusion patterns:
+        # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
+        #                            the name)
+        # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
+        #                            entire path, and should be relative to
+        #                            any input dir argument)
+        for p in args.exclude:
+            # Only relative paths patterns allowed
+            if p.startswith(os.sep):
+                logger.error("Cannot use absolute paths with --exclude")
+                sys.exit(1)
+            if os.path.dirname(p):
+                # We don't support of path patterns with '.' or '..'
+                p_parts = p.split(os.sep)
+                if '.' in p_parts or '..' in p_parts:
+                    logger.error(
+                        "Cannot use path patterns that include '.' or '..'")
+                    sys.exit(1)
+                # Path search pattern
+                exclude_paths.append(p)
+            else:
+                # Name-only search pattern
+                name_patterns.append(p)
+        # For name only matching, we can combine all patterns into a single regexp,
+        # for better performance.
+        exclude_names = re.compile('|'.join(
+            [fnmatch.translate(p) for p in name_patterns]
+        )) if len(name_patterns) > 0 else None
+        # Show the user the patterns to be used, just in case they weren't specified inside
+        # quotes and got changed by the shell expansion.
+        logger.info("Exclude patterns: {}".format(args.exclude))
+
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
-                                 bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -872,7 +1056,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
                                  logger=logger,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
                                  logger=logger,
-                                 dry_run=args.dry_run)
+                                 dry_run=args.dry_run,
+                                 follow_links=args.follow_links,
+                                 exclude_paths=exclude_paths,
+                                 exclude_names=exclude_names)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -888,6 +1075,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
+    except PathDoesNotExistError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -908,12 +1099,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
-    except ArvPutUploadIsPending:
-        # Dry run check successful, return proper exit code.
-        sys.exit(2)
-    except ArvPutUploadNotPending:
-        # No files pending for upload
-        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
@@ -949,7 +1134,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if not output.endswith('\n'):
             stdout.write('\n')
 
         if not output.endswith('\n'):
             stdout.write('\n')
 
-    for sigcode, orig_handler in orig_signal_handlers.items():
+    for sigcode, orig_handler in listitems(orig_signal_handlers):
         signal.signal(sigcode, orig_handler)
 
     if status != 0:
         signal.signal(sigcode, orig_handler)
 
     if status != 0: