Merge branch '18027-unmount-fuse'
[arvados.git] / sdk / python / arvados / commands / put.py
index 2dee1dcc137720e78d8a1d1b1306295b6f9a597f..f6f85ba69619ba930cca9efd20d3b4f134f28527 100644 (file)
@@ -1,19 +1,21 @@
-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
 
 from __future__ import division
 
 from __future__ import division
+from future.utils import listitems, listvalues
 from builtins import str
 from builtins import object
 import argparse
 import arvados
 import arvados.collection
 import base64
 from builtins import str
 from builtins import object
 import argparse
 import arvados
 import arvados.collection
 import base64
+import ciso8601
 import copy
 import datetime
 import errno
 import fcntl
 import copy
 import datetime
 import errno
 import fcntl
+import fnmatch
 import hashlib
 import json
 import logging
 import hashlib
 import json
 import logging
@@ -30,10 +32,10 @@ import traceback
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
+from arvados.util import keep_locator_pattern
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
-CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
 api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
@@ -43,7 +45,9 @@ upload_opts.add_argument('--version', action='version',
                          help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
                          help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
-Local file or directory. Default: read from standard input.
+Local file or directory. If path is a directory reference with a trailing
+slash, then just upload the directory's contents; otherwise upload the
+directory itself. Default: read from standard input.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
@@ -73,8 +77,7 @@ Synonym for --stream.
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
@@ -138,6 +141,10 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--storage-classes', help="""
+Specify comma separated list of storage classes to be used when saving data to Keep.
+""")
+
 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
                          help="""
 Set the number of upload threads to be used. Take into account that
 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
                          help="""
 Set the number of upload threads to be used. Take into account that
@@ -147,6 +154,30 @@ On high latency installations, using a greater number will improve
 overall throughput.
 """)
 
 overall throughput.
 """)
 
+upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where it is placed.
+For the special case of needing to exclude only files or dirs directly below
+the given input directory, you can use a pattern like './exclude_this.gif'.
+You can specify multiple patterns by using this argument more than once.
+""")
+
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
+""")
+
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -178,6 +209,18 @@ Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 total data size).
 """)
 
+run_opts.add_argument('--silent', action='store_true',
+                      help="""
+Do not print any debug messages to console. (Any error messages will
+still be displayed.)
+""")
+
+run_opts.add_argument('--batch', action='store_true', default=False,
+                      help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
@@ -198,6 +241,19 @@ _group.add_argument('--no-cache', action='store_false', dest='use_cache',
 Do not save upload state in a cache file for resuming.
 """)
 
 Do not save upload state in a cache file for resuming.
 """)
 
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+                    help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+                    help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -210,15 +266,14 @@ def parse_arguments(arguments):
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
-    if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
-        if args.filename:
-            arg_parser.error("""
+    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+        arg_parser.error("""
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
 
     # Turn on --progress by default if stderr is a tty.
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
 
     # Turn on --progress by default if stderr is a tty.
-    if (not (args.batch_progress or args.no_progress)
+    if (not (args.batch_progress or args.no_progress or args.silent)
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
@@ -236,9 +291,17 @@ def parse_arguments(arguments):
         if not args.filename:
             args.filename = 'stdin'
 
         if not args.filename:
             args.filename = 'stdin'
 
+    # Remove possible duplicated patterns
+    if len(args.exclude) > 0:
+        args.exclude = list(set(args.exclude))
+
     return args
 
 
     return args
 
 
+class PathDoesNotExistError(Exception):
+    pass
+
+
 class CollectionUpdateError(Exception):
     pass
 
 class CollectionUpdateError(Exception):
     pass
 
@@ -247,6 +310,9 @@ class ResumeCacheConflict(Exception):
     pass
 
 
     pass
 
 
+class ResumeCacheInvalidError(Exception):
+    pass
+
 class ArvPutArgumentConflict(Exception):
     pass
 
 class ArvPutArgumentConflict(Exception):
     pass
 
@@ -270,6 +336,24 @@ class FileUploadList(list):
         super(FileUploadList, self).append(other)
 
 
         super(FileUploadList, self).append(other)
 
 
+# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
+class ArvPutLogFormatter(logging.Formatter):
+    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
+    err_fmtr = None
+    request_id_informed = False
+
+    def __init__(self, request_id):
+        self.err_fmtr = logging.Formatter(
+            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
+            arvados.log_date_format)
+
+    def format(self, record):
+        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
+            self.request_id_informed = True
+            return self.err_fmtr.format(record)
+        return self.std_fmtr.format(record)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -296,7 +380,7 @@ class ResumeCache(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def load(self):
         self.cache_file.seek(0)
 
     def load(self):
         self.cache_file.seek(0)
@@ -327,7 +411,7 @@ class ResumeCache(object):
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(data, new_cache)
             os.rename(new_cache_name, self.filename)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(data, new_cache)
             os.rename(new_cache_name, self.filename)
-        except (IOError, OSError, ResumeCacheConflict) as error:
+        except (IOError, OSError, ResumeCacheConflict):
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -360,17 +444,22 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 bytes_expected=None, name=None, owner_uuid=None,
+                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                  ensure_unique_name=False, num_retries=None,
                  ensure_unique_name=False, num_retries=None,
-                 put_threads=None, replication_desired=None,
-                 filename=None, update_time=60.0, update_collection=None,
-                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
+                 put_threads=None, replication_desired=None, filename=None,
+                 update_time=60.0, update_collection=None, storage_classes=None,
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+                 follow_links=True, exclude_paths=[], exclude_names=None,
+                 trash_at=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
+        self.batch_mode = batch_mode
         self.update = False
         self.reporter = reporter
         self.update = False
         self.reporter = reporter
-        self.bytes_expected = bytes_expected
+        # This will set to 0 before start counting, if no special files are going
+        # to be read.
+        self.bytes_expected = None
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
@@ -380,6 +469,8 @@ class ArvPutUploadJob(object):
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
+        self.storage_classes = storage_classes
+        self._api_client = api_client
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
@@ -397,6 +488,16 @@ class ArvPutUploadJob(object):
         self.logger = logger
         self.dry_run = dry_run
         self._checkpoint_before_quit = True
         self.logger = logger
         self.dry_run = dry_run
         self._checkpoint_before_quit = True
+        self.follow_links = follow_links
+        self.exclude_paths = exclude_paths
+        self.exclude_names = exclude_names
+        self._trash_at = trash_at
+
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -408,45 +509,104 @@ class ArvPutUploadJob(object):
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
+        # Build the upload file list, excluding requested files and counting the
+        # bytes expected to be uploaded.
+        self._build_upload_list()
+
+    def _build_upload_list(self):
+        """
+        Scan the requested paths to count file sizes, excluding requested files
+        and dirs and building the upload file list.
+        """
+        # If there aren't special files to be read, reset total bytes count to zero
+        # to start counting.
+        if not any([p for p in self.paths
+                    if not (os.path.isfile(p) or os.path.isdir(p))]):
+            self.bytes_expected = 0
+
+        for path in self.paths:
+            # Test for stdin first, in case some file named '-' exist
+            if path == '-':
+                if self.dry_run:
+                    raise ArvPutUploadIsPending()
+                self._write_stdin(self.filename or 'stdin')
+            elif not os.path.exists(path):
+                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+            elif (not self.follow_links) and os.path.islink(path):
+                self.logger.warning("Skipping symlink '{}'".format(path))
+                continue
+            elif os.path.isdir(path):
+                # Use absolute paths on cache index so CWD doesn't interfere
+                # with the caching logic.
+                orig_path = path
+                path = os.path.abspath(path)
+                if orig_path[-1:] == os.sep:
+                    # When passing a directory reference with a trailing slash,
+                    # its contents should be uploaded directly to the
+                    # collection's root.
+                    prefixdir = path
+                else:
+                    # When passing a directory reference with no trailing slash,
+                    # upload the directory to the collection's root.
+                    prefixdir = os.path.dirname(path)
+                prefixdir += os.sep
+                for root, dirs, files in os.walk(path,
+                                                 followlinks=self.follow_links):
+                    root_relpath = os.path.relpath(root, path)
+                    if root_relpath == '.':
+                        root_relpath = ''
+                    # Exclude files/dirs by full path matching pattern
+                    if self.exclude_paths:
+                        dirs[:] = [d for d in dirs
+                                   if not any(pathname_match(
+                                           os.path.join(root_relpath, d), pat)
+                                              for pat in self.exclude_paths)]
+                        files = [f for f in files
+                                 if not any(pathname_match(
+                                         os.path.join(root_relpath, f), pat)
+                                            for pat in self.exclude_paths)]
+                    # Exclude files/dirs by name matching pattern
+                    if self.exclude_names is not None:
+                        dirs[:] = [d for d in dirs
+                                   if not self.exclude_names.match(d)]
+                        files = [f for f in files
+                                 if not self.exclude_names.match(f)]
+                    # Make os.walk()'s dir traversing order deterministic
+                    dirs.sort()
+                    files.sort()
+                    for f in files:
+                        filepath = os.path.join(root, f)
+                        # Add its size to the total bytes count (if applicable)
+                        if self.follow_links or (not os.path.islink(filepath)):
+                            if self.bytes_expected is not None:
+                                self.bytes_expected += os.path.getsize(filepath)
+                        self._check_file(filepath,
+                                         os.path.join(root[len(prefixdir):], f))
+            else:
+                filepath = os.path.abspath(path)
+                # Add its size to the total bytes count (if applicable)
+                if self.follow_links or (not os.path.islink(filepath)):
+                    if self.bytes_expected is not None:
+                        self.bytes_expected += os.path.getsize(filepath)
+                self._check_file(filepath,
+                                 self.filename or os.path.basename(path))
+        # If dry-mode is on, and got up to this point, then we should notify that
+        # there aren't any file to upload.
+        if self.dry_run:
+            raise ArvPutUploadNotPending()
+        # Remove local_collection's files that don't exist locally anymore, so the
+        # bytes_written count is correct.
+        for f in self.collection_file_paths(self._local_collection,
+                                            path_prefix=""):
+            if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                self._local_collection.remove(f)
+
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        if not self.dry_run:
-            self._checkpointer.start()
+        self._checkpointer.start()
         try:
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    if self.dry_run:
-                        raise ArvPutUploadIsPending()
-                    self._write_stdin(self.filename or 'stdin')
-                elif os.path.isdir(path):
-                    # Use absolute paths on cache index so CWD doesn't interfere
-                    # with the caching logic.
-                    prefixdir = path = os.path.abspath(path)
-                    if prefixdir != '/':
-                        prefixdir += '/'
-                    for root, dirs, files in os.walk(path):
-                        # Make os.walk()'s dir traversing order deterministic
-                        dirs.sort()
-                        files.sort()
-                        for f in files:
-                            self._check_file(os.path.join(root, f),
-                                             os.path.join(root[len(prefixdir):], f))
-                else:
-                    self._check_file(os.path.abspath(path),
-                                     self.filename or os.path.basename(path))
-            # If dry-mode is on, and got up to this point, then we should notify that
-            # there aren't any file to upload.
-            if self.dry_run:
-                raise ArvPutUploadNotPending()
-            # Remove local_collection's files that don't exist locally anymore, so the
-            # bytes_written count is correct.
-            for f in self.collection_file_paths(self._local_collection,
-                                                path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in self._file_paths:
-                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
@@ -456,10 +616,14 @@ class ArvPutUploadJob(object):
         except (SystemExit, Exception) as e:
             self._checkpoint_before_quit = False
             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
         except (SystemExit, Exception) as e:
             self._checkpoint_before_quit = False
             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
-            # Note: We're expecting SystemExit instead of KeyboardInterrupt because
-            #   we have a custom signal handler in place that raises SystemExit with
-            #   the catched signal's code.
-            if not isinstance(e, SystemExit) or e.code != -2:
+            # Note: We're expecting SystemExit instead of
+            # KeyboardInterrupt because we have a custom signal
+            # handler in place that raises SystemExit with the catched
+            # signal's code.
+            if isinstance(e, PathDoesNotExistError):
+                # We aren't interested in the traceback for this case
+                pass
+            elif not isinstance(e, SystemExit) or e.code != -2:
                 self.logger.warning("Abnormal termination:\n{}".format(
                     traceback.format_exc()))
             raise
                 self.logger.warning("Abnormal termination:\n{}".format(
                     traceback.format_exc()))
             raise
@@ -477,6 +641,17 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
             if self.use_cache:
                 self._cache_file.close()
 
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
+
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
@@ -491,12 +666,17 @@ class ArvPutUploadJob(object):
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._remote_collection.save(num_retries=self.num_retries)
+            self._remote_collection.save(num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
         else:
+            if len(self._local_collection) == 0:
+                self.logger.warning("No files were uploaded, skipping collection creation.")
+                return
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+                num_retries=self.num_retries,
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
 
     def destroy_cache(self):
         if self.use_cache:
@@ -513,7 +693,7 @@ class ArvPutUploadJob(object):
         Recursively get the total size of the collection
         """
         size = 0
         Recursively get the total size of the collection
         """
         size = 0
-        for item in list(collection.values()):
+        for item in listvalues(collection):
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
@@ -547,7 +727,19 @@ class ArvPutUploadJob(object):
                     with self._state_lock:
                         self._state['manifest'] = manifest
             if self.use_cache:
                     with self._state_lock:
                         self._state['manifest'] = manifest
             if self.use_cache:
-                self._save_state()
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -558,12 +750,17 @@ class ArvPutUploadJob(object):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
-        output = self._local_collection.open(filename, 'w')
-        self._write(sys.stdin, output)
+        output = self._local_collection.open(filename, 'wb')
+        self._write(sys.stdin.buffer, output)
         output.close()
 
     def _check_file(self, source, filename):
         output.close()
 
     def _check_file(self, source, filename):
-        """Check if this file needs to be uploaded"""
+        """
+        Check if this file needs to be uploaded
+        """
+        # Ignore symlinks when requested
+        if (not self.follow_links) and os.path.islink(source):
+            return
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
@@ -598,6 +795,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
+                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
@@ -612,7 +810,7 @@ class ArvPutUploadJob(object):
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 self._local_collection.remove(filename)
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 self._local_collection.remove(filename)
-                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
             if file_in_local_collection:
         # Local file differs from cached data, re-upload it.
         else:
             if file_in_local_collection:
@@ -620,21 +818,27 @@ class ArvPutUploadJob(object):
             should_upload = True
 
         if should_upload:
             should_upload = True
 
         if should_upload:
-            self._files_to_upload.append((source, resume_offset, filename))
+            try:
+                self._files_to_upload.append((source, resume_offset, filename))
+            except ArvPutUploadIsPending:
+                # This could happen when running on dry-mode, close cache file to
+                # avoid locking issues.
+                self._cache_file.close()
+                raise
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
-            with open(source, 'r') as source_fd:
+            with open(source, 'rb') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
-                    output = self._local_collection.open(filename, 'a')
+                    output = self._local_collection.open(filename, 'ab')
                     source_fd.seek(resume_offset)
                 else:
                     # Start from scratch
                     source_fd.seek(resume_offset)
                 else:
                     # Start from scratch
-                    output = self._local_collection.open(filename, 'w')
+                    output = self._local_collection.open(filename, 'wb')
                 self._write(source_fd, output)
                 output.close(flush=False)
 
                 self._write(source_fd, output)
                 output.close(flush=False)
 
@@ -648,6 +852,20 @@ class ArvPutUploadJob(object):
     def _my_collection(self):
         return self._remote_collection if self.update else self._local_collection
 
     def _my_collection(self):
         return self._remote_collection if self.update else self._local_collection
 
+    def _get_cache_filepath(self):
+        # Set up cache file name from input paths.
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
+        realpaths = sorted(os.path.realpath(path) for path in self.paths)
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
+        if self.filename:
+            md5.update(self.filename.encode())
+        cache_filename = md5.hexdigest()
+        cache_filepath = os.path.join(
+            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+            cache_filename)
+        return cache_filepath
+
     def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
     def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
@@ -656,7 +874,11 @@ class ArvPutUploadJob(object):
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
-                self._remote_collection = arvados.collection.Collection(update_collection)
+                self._remote_collection = arvados.collection.Collection(
+                    update_collection,
+                    api_client=self._api_client,
+                    storage_classes_desired=self.storage_classes,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -666,21 +888,13 @@ class ArvPutUploadJob(object):
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
         if self.use_cache:
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
         if self.use_cache:
-            # Set up cache file name from input paths.
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update(b'\0'.join([p.encode() for p in realpaths]))
-            if self.filename:
-                md5.update(self.filename.encode())
-            cache_filename = md5.hexdigest()
-            cache_filepath = os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename)
-            if self.resume:
+            cache_filepath = self._get_cache_filepath()
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
+                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -697,15 +911,71 @@ class ArvPutUploadJob(object):
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
+                self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            if not self._cached_manifest_valid():
+                if not self.batch_mode:
+                    raise ResumeCacheInvalidError()
+                else:
+                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+                    self.use_cache = False # Don't overwrite preexisting cache file.
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+            self._local_collection = arvados.collection.Collection(
+                self._state['manifest'],
+                replication_desired=self.replication_desired,
+                storage_classes_desired=self.storage_classes,
+                put_threads=self.put_threads,
+                api_client=self._api_client,
+                num_retries=self.num_retries)
+
+    def _cached_manifest_valid(self):
+        """
+        Validate the oldest non-expired block signature to check if cached manifest
+        is usable: checking if the cached manifest was not created with a different
+        arvados account.
+        """
+        if self._state.get('manifest', None) is None:
+            # No cached manifest yet, all good.
+            return True
+        now = datetime.datetime.utcnow()
+        oldest_exp = None
+        oldest_loc = None
+        block_found = False
+        for m in keep_locator_pattern.finditer(self._state['manifest']):
+            loc = m.group(0)
+            try:
+                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+            except IndexError:
+                # Locator without signature
+                continue
+            block_found = True
+            if exp > now and (oldest_exp is None or exp < oldest_exp):
+                oldest_exp = exp
+                oldest_loc = loc
+        if not block_found:
+            # No block signatures found => no invalid block signatures.
+            return True
+        if oldest_loc is None:
+            # Locator signatures found, but all have expired.
+            # Reset the cache and move on.
+            self.logger.info('Cache expired, starting from scratch.')
+            self._state['manifest'] = ''
+            return True
+        kc = arvados.KeepClient(api_client=self._api_client,
+                                num_retries=self.num_retries)
+        try:
+            kc.head(oldest_loc)
+        except arvados.errors.KeepRequestError:
+            # Something is wrong, cached manifest is not valid.
+            return False
+        return True
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
-        for name, item in list(col.items()):
+        for name, item in listitems(col):
             if isinstance(item, arvados.arvfile.ArvadosFile):
                 file_paths.append(os.path.join(path_prefix, name))
             elif isinstance(item, arvados.collection.Subcollection):
             if isinstance(item, arvados.arvfile.ArvadosFile):
                 file_paths.append(os.path.join(path_prefix, name))
             elif isinstance(item, arvados.collection.Subcollection):
@@ -717,26 +987,26 @@ class ArvPutUploadJob(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def _save_state(self):
         """
         Atomically save current state into cache.
         """
 
     def _save_state(self):
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
         try:
-            with self._state_lock:
-                # We're not using copy.deepcopy() here because it's a lot slower
-                # than json.dumps(), and we're already needing JSON format to be
-                # saved on disk.
-                state = json.dumps(self._state)
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
+            new_cache = tempfile.NamedTemporaryFile(
+                mode='w+',
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
             new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
@@ -750,6 +1020,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -758,7 +1031,7 @@ class ArvPutUploadJob(object):
         m = self._my_collection().stripped_manifest().encode()
         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
         if pdh != local_pdh:
         m = self._my_collection().stripped_manifest().encode()
         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
         if pdh != local_pdh:
-            logger.warning("\n".join([
+            self.logger.warning("\n".join([
                 "arv-put: API server provided PDH differs from local manifest.",
                 "         This should not happen; showing API server version."]))
         return pdh
                 "arv-put: API server provided PDH differs from local manifest.",
                 "         This should not happen; showing API server version."]))
         return pdh
@@ -782,7 +1055,7 @@ class ArvPutUploadJob(object):
                     locators.append(loc)
                 return locators
         elif isinstance(item, arvados.collection.Collection):
                     locators.append(loc)
                 return locators
         elif isinstance(item, arvados.collection.Collection):
-            l = [self._datablocks_on_item(x) for x in list(item.values())]
+            l = [self._datablocks_on_item(x) for x in listvalues(item)]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
@@ -796,23 +1069,24 @@ class ArvPutUploadJob(object):
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
-
-def expected_bytes_for(pathlist):
-    # Walk the given directory trees and stat files, adding up file sizes,
-    # so we can display progress as percent
-    bytesum = 0
-    for path in pathlist:
-        if os.path.isdir(path):
-            for filename in arvados.util.listdir_recursive(path):
-                bytesum += os.path.getsize(os.path.join(path, filename))
-        elif not os.path.isfile(path):
-            return None
-        else:
-            bytesum += os.path.getsize(path)
-    return bytesum
-
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
+
+# Simulate glob.glob() matching behavior without the need to scan the filesystem
+# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
+# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
+# so instead we're using it on every path component.
+def pathname_match(pathname, pattern):
+    name = pathname.split(os.sep)
+    # Fix patterns like 'some/subdir/' or 'some//subdir'
+    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
+    if len(name) != len(pat):
+        return False
+    for i in range(len(name)):
+        if not fnmatch.fnmatch(name[i], pat[i]):
+            return False
+    return True
+
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 def machine_progress(bytes_written, bytes_expected):
     return _machine_format.format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
@@ -830,9 +1104,6 @@ def progress_writer(progress_func, outfile=sys.stderr):
         outfile.write(progress_func(bytes_written, bytes_expected))
     return write_progress
 
         outfile.write(progress_func(bytes_written, bytes_expected))
     return write_progress
 
-def exit_signal_handler(sigcode, frame):
-    sys.exit(-sigcode)
-
 def desired_project_uuid(api_client, project_uuid, num_retries):
     if not project_uuid:
         query = api_client.users().current()
 def desired_project_uuid(api_client, project_uuid, num_retries):
     if not project_uuid:
         query = api_client.users().current()
@@ -844,15 +1115,66 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+         install_sig_handlers=True):
     global api_client
 
     global api_client
 
-    logger = logging.getLogger('arvados.arv_put')
-    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     args = parse_arguments(arguments)
+    logger = logging.getLogger('arvados.arv_put')
+    if args.silent:
+        logger.setLevel(logging.WARNING)
+    else:
+        logger.setLevel(logging.INFO)
     status = 0
     status = 0
+
+    request_id = arvados.util.new_request_id()
+
+    formatter = ArvPutLogFormatter(request_id)
+    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
+
     if api_client is None:
     if api_client is None:
-        api_client = arvados.api('v1')
+        api_client = arvados.api('v1', request_id=request_id)
+
+    if install_sig_handlers:
+        arv_cmd.install_signal_handlers()
+
+    # Trash arguments validation
+    trash_at = None
+    if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
+        try:
+            trash_at = ciso8601.parse_datetime(args.trash_at)
+        except:
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        else:
+            if trash_at.tzinfo is not None:
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                if time.daylight:
+                    utcoffset = datetime.timedelta(seconds=time.altzone)
+                else:
+                    utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+        if trash_at <= datetime.datetime.utcnow():
+            logger.error("--trash-at argument must be set in the future")
+            sys.exit(1)
+    if args.trash_after is not None:
+        if args.trash_after < 1:
+            logger.error("--trash-after argument must be >= 1")
+            sys.exit(1)
+        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
 
     # Determine the name to use
     if args.name:
 
     # Determine the name to use
     if args.name:
@@ -888,19 +1210,62 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
+    #  Split storage-classes argument
+    storage_classes = None
+    if args.storage_classes:
+        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
+
+    # Setup exclude regex from all the --exclude arguments provided
+    name_patterns = []
+    exclude_paths = []
+    exclude_names = None
+    if len(args.exclude) > 0:
+        # We're supporting 2 kinds of exclusion patterns:
+        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
+        #                            the name, wherever the file is on the tree)
+        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
+        #                            entire path, and should be relative to
+        #                            any input dir argument)
+        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
+        #                            placed directly underneath the input dir)
+        for p in args.exclude:
+            # Only relative paths patterns allowed
+            if p.startswith(os.sep):
+                logger.error("Cannot use absolute paths with --exclude")
+                sys.exit(1)
+            if os.path.dirname(p):
+                # We don't support of path patterns with '..'
+                p_parts = p.split(os.sep)
+                if '..' in p_parts:
+                    logger.error(
+                        "Cannot use path patterns that include or '..'")
+                    sys.exit(1)
+                # Path search pattern
+                exclude_paths.append(p)
+            else:
+                # Name-only search pattern
+                name_patterns.append(p)
+        # For name only matching, we can combine all patterns into a single
+        # regexp, for better performance.
+        exclude_names = re.compile('|'.join(
+            [fnmatch.translate(p) for p in name_patterns]
+        )) if len(name_patterns) > 0 else None
+        # Show the user the patterns to be used, just in case they weren't
+        # specified inside quotes and got changed by the shell expansion.
+        logger.info("Exclude patterns: {}".format(args.exclude))
+
     # If this is used by a human, and there's at least one directory to be
     # uploaded, the expected bytes calculation can take a moment.
     if args.progress and any([os.path.isdir(f) for f in args.paths]):
         logger.info("Calculating upload size, this could take some time...")
     # If this is used by a human, and there's at least one directory to be
     # uploaded, the expected bytes calculation can take a moment.
     if args.progress and any([os.path.isdir(f) for f in args.paths]):
         logger.info("Calculating upload size, this could take some time...")
-    bytes_expected = expected_bytes_for(args.paths)
-
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
+                                 batch_mode= args.batch,
                                  filename = args.filename,
                                  reporter = reporter,
                                  filename = args.filename,
                                  reporter = reporter,
-                                 bytes_expected = bytes_expected,
+                                 api_client = api_client,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -908,14 +1273,28 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
+                                 storage_classes=storage_classes,
                                  logger=logger,
                                  logger=logger,
-                                 dry_run=args.dry_run)
+                                 dry_run=args.dry_run,
+                                 follow_links=args.follow_links,
+                                 exclude_paths=exclude_paths,
+                                 exclude_names=exclude_names,
+                                 trash_at=trash_at)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
-    except CollectionUpdateError as error:
+    except ResumeCacheInvalidError:
+        logger.error("\n".join([
+            "arv-put: Resume cache contains invalid signature: it may have expired",
+            "         or been created with another Arvados user's credentials.",
+            "         Switch user or use one of the following options to restart upload:",
+            "         --no-resume to start a new resume cache.",
+            "         --no-cache to disable resume cache.",
+            "         --batch to ignore the resume cache if invalid."]))
+        sys.exit(1)
+    except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -926,11 +1305,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         # No files pending for upload
         sys.exit(0)
 
         # No files pending for upload
         sys.exit(0)
 
-    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
-    # the originals.
-    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
-                            for sigcode in CAUGHT_SIGNALS}
-
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
             "arv-put: Resuming previous upload from last checkpoint.",
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
             "arv-put: Resuming previous upload from last checkpoint.",
@@ -941,16 +1315,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
-    except arvados.errors.ApiError as error:
+    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
-    except ArvPutUploadIsPending:
-        # Dry run check successful, return proper exit code.
-        sys.exit(2)
-    except ArvPutUploadNotPending:
-        # No files pending for upload
-        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
@@ -962,12 +1330,23 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
-    else:
+    elif writer.manifest_locator() is not None:
         try:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                if time.daylight:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+                else:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
             if args.update_collection:
-                logger.info("Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
             else:
-                logger.info("Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -977,17 +1356,19 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
+    else:
+        status = 1
 
     # Print the locator (uuid) of the new collection.
     if output is None:
         status = status or 1
 
     # Print the locator (uuid) of the new collection.
     if output is None:
         status = status or 1
-    else:
+    elif not args.silent:
         stdout.write(output)
         if not output.endswith('\n'):
             stdout.write('\n')
 
         stdout.write(output)
         if not output.endswith('\n'):
             stdout.write('\n')
 
-    for sigcode, orig_handler in list(orig_signal_handlers.items()):
-        signal.signal(sigcode, orig_handler)
+    if install_sig_handlers:
+        arv_cmd.restore_signal_handlers()
 
     if status != 0:
         sys.exit(status)
 
     if status != 0:
         sys.exit(status)