Merge branch '17936-arvput-batch-mode' into main. Closes #17936.
[arvados.git] / sdk / python / arvados / commands / put.py
index 5dde8e53c933d05b2facbf8df284941635da3b42..f6f85ba69619ba930cca9efd20d3b4f134f28527 100644 (file)
@@ -10,6 +10,7 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import ciso8601
 import copy
 import datetime
 import errno
@@ -31,10 +32,10 @@ import traceback
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
+from arvados.util import keep_locator_pattern
 
 import arvados.commands._util as arv_cmd
 
-CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
 api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
@@ -76,8 +77,7 @@ Synonym for --stream.
 _group.add_argument('--stream', action='store_true',
                     help="""
 Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
@@ -141,6 +141,10 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--storage-classes', help="""
+Specify comma separated list of storage classes to be used when saving data to Keep.
+""")
+
 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
                          help="""
 Set the number of upload threads to be used. Take into account that
@@ -150,6 +154,30 @@ On high latency installations, using a greater number will improve
 overall throughput.
 """)
 
+upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where it is placed.
+For the special case of needing to exclude only files or dirs directly below
+the given input directory, you can use a pattern like './exclude_this.gif'.
+You can specify multiple patterns by using this argument more than once.
+""")
+
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
+""")
+
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -161,18 +189,6 @@ run_opts.add_argument('--name', help="""
 Save the collection with the specified name.
 """)
 
-run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
-                      action='append', help="""
-Exclude files and directories whose names match the given glob pattern. When
-using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
-directory, relative to the provided input dirs will be excluded.
-When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
-For the special case of needing to exclude only files or dirs directly below
-the given input directory, you can use a pattern like './exclude_this.gif'.
-You can specify multiple patterns by using this argument more than once.
-""")
-
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
@@ -199,6 +215,12 @@ Do not print any debug messages to console. (Any error messages will
 still be displayed.)
 """)
 
+run_opts.add_argument('--batch', action='store_true', default=False,
+                      help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
@@ -209,16 +231,6 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
-_group = run_opts.add_mutually_exclusive_group()
-_group.add_argument('--follow-links', action='store_true', default=True,
-                    dest='follow_links', help="""
-Follow file and directory symlinks (default).
-""")
-_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
-                    help="""
-Do not follow file and directory symlinks.
-""")
-
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                     help="""
@@ -229,6 +241,19 @@ _group.add_argument('--no-cache', action='store_false', dest='use_cache',
 Do not save upload state in a cache file for resuming.
 """)
 
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+                    help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+                    help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -241,9 +266,8 @@ def parse_arguments(arguments):
 
     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
-    if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
-        if args.filename:
-            arg_parser.error("""
+    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+        arg_parser.error("""
     --filename argument cannot be used when storing a directory or
     multiple files.
     """)
@@ -286,6 +310,9 @@ class ResumeCacheConflict(Exception):
     pass
 
 
+class ResumeCacheInvalidError(Exception):
+    pass
+
 class ArvPutArgumentConflict(Exception):
     pass
 
@@ -353,7 +380,7 @@ class ResumeCache(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def load(self):
         self.cache_file.seek(0)
@@ -384,7 +411,7 @@ class ResumeCache(object):
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(data, new_cache)
             os.rename(new_cache_name, self.filename)
-        except (IOError, OSError, ResumeCacheConflict) as error:
+        except (IOError, OSError, ResumeCacheConflict):
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -417,15 +444,17 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None, api_client=None,
+                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                  ensure_unique_name=False, num_retries=None,
-                 put_threads=None, replication_desired=None,
-                 filename=None, update_time=60.0, update_collection=None,
+                 put_threads=None, replication_desired=None, filename=None,
+                 update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
-                 follow_links=True, exclude_paths=[], exclude_names=None):
+                 follow_links=True, exclude_paths=[], exclude_names=None,
+                 trash_at=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
+        self.batch_mode = batch_mode
         self.update = False
         self.reporter = reporter
         # This will set to 0 before start counting, if no special files are going
@@ -440,6 +469,7 @@ class ArvPutUploadJob(object):
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
+        self.storage_classes = storage_classes
         self._api_client = api_client
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -461,6 +491,13 @@ class ArvPutUploadJob(object):
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
+        self._trash_at = trash_at
+
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -478,8 +515,8 @@ class ArvPutUploadJob(object):
 
     def _build_upload_list(self):
         """
-        Scan the requested paths to count file sizes, excluding files & dirs if requested
-        and building the upload file list.
+        Scan the requested paths to count file sizes, excluding requested files
+        and dirs and building the upload file list.
         """
         # If there aren't special files to be read, reset total bytes count to zero
         # to start counting.
@@ -494,7 +531,10 @@ class ArvPutUploadJob(object):
                     raise ArvPutUploadIsPending()
                 self._write_stdin(self.filename or 'stdin')
             elif not os.path.exists(path):
-                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+            elif (not self.follow_links) and os.path.islink(path):
+                self.logger.warning("Skipping symlink '{}'".format(path))
+                continue
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
                 # with the caching logic.
@@ -601,6 +641,17 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
+
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
@@ -615,12 +666,17 @@ class ArvPutUploadJob(object):
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._remote_collection.save(num_retries=self.num_retries)
+            self._remote_collection.save(num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
+            if len(self._local_collection) == 0:
+                self.logger.warning("No files were uploaded, skipping collection creation.")
+                return
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+                num_retries=self.num_retries,
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
@@ -675,6 +731,15 @@ class ArvPutUploadJob(object):
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -686,7 +751,7 @@ class ArvPutUploadJob(object):
 
     def _write_stdin(self, filename):
         output = self._local_collection.open(filename, 'wb')
-        self._write(sys.stdin, output)
+        self._write(sys.stdin.buffer, output)
         output.close()
 
     def _check_file(self, source, filename):
@@ -730,7 +795,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
-                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
@@ -745,7 +810,7 @@ class ArvPutUploadJob(object):
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 self._local_collection.remove(filename)
-                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
             if file_in_local_collection:
@@ -787,6 +852,20 @@ class ArvPutUploadJob(object):
     def _my_collection(self):
         return self._remote_collection if self.update else self._local_collection
 
+    def _get_cache_filepath(self):
+        # Set up cache file name from input paths.
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
+        realpaths = sorted(os.path.realpath(path) for path in self.paths)
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
+        if self.filename:
+            md5.update(self.filename.encode())
+        cache_filename = md5.hexdigest()
+        cache_filepath = os.path.join(
+            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+            cache_filename)
+        return cache_filepath
+
     def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
@@ -796,7 +875,10 @@ class ArvPutUploadJob(object):
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
-                    update_collection, api_client=self._api_client)
+                    update_collection,
+                    api_client=self._api_client,
+                    storage_classes_desired=self.storage_classes,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -806,23 +888,13 @@ class ArvPutUploadJob(object):
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
         if self.use_cache:
-            # Set up cache file name from input paths.
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update(b'\0'.join([p.encode() for p in realpaths]))
-            if self.filename:
-                md5.update(self.filename.encode())
-            cache_filename = md5.hexdigest()
-            cache_filepath = os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename)
+            cache_filepath = self._get_cache_filepath()
             if self.resume and os.path.exists(cache_filepath):
-                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
-                self.logger.info("Creating new cache file at {}".format(cache_filepath))
+                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -842,12 +914,63 @@ class ArvPutUploadJob(object):
                 self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            if not self._cached_manifest_valid():
+                if not self.batch_mode:
+                    raise ResumeCacheInvalidError()
+                else:
+                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+                    self.use_cache = False # Don't overwrite preexisting cache file.
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
+                storage_classes_desired=self.storage_classes,
                 put_threads=self.put_threads,
-                api_client=self._api_client)
+                api_client=self._api_client,
+                num_retries=self.num_retries)
+
+    def _cached_manifest_valid(self):
+        """
+        Validate the oldest non-expired block signature to check if cached manifest
+        is usable: checking if the cached manifest was not created with a different
+        arvados account.
+        """
+        if self._state.get('manifest', None) is None:
+            # No cached manifest yet, all good.
+            return True
+        now = datetime.datetime.utcnow()
+        oldest_exp = None
+        oldest_loc = None
+        block_found = False
+        for m in keep_locator_pattern.finditer(self._state['manifest']):
+            loc = m.group(0)
+            try:
+                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+            except IndexError:
+                # Locator without signature
+                continue
+            block_found = True
+            if exp > now and (oldest_exp is None or exp < oldest_exp):
+                oldest_exp = exp
+                oldest_loc = loc
+        if not block_found:
+            # No block signatures found => no invalid block signatures.
+            return True
+        if oldest_loc is None:
+            # Locator signatures found, but all have expired.
+            # Reset the cache and move on.
+            self.logger.info('Cache expired, starting from scratch.')
+            self._state['manifest'] = ''
+            return True
+        kc = arvados.KeepClient(api_client=self._api_client,
+                                num_retries=self.num_retries)
+        try:
+            kc.head(oldest_loc)
+        except arvados.errors.KeepRequestError:
+            # Something is wrong, cached manifest is not valid.
+            return False
+        return True
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
@@ -864,7 +987,7 @@ class ArvPutUploadJob(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def _save_state(self):
         """
@@ -897,6 +1020,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -978,10 +1104,6 @@ def progress_writer(progress_func, outfile=sys.stderr):
         outfile.write(progress_func(bytes_written, bytes_expected))
     return write_progress
 
-def exit_signal_handler(sigcode, frame):
-    logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
-    sys.exit(-sigcode)
-
 def desired_project_uuid(api_client, project_uuid, num_retries):
     if not project_uuid:
         query = api_client.users().current()
@@ -993,7 +1115,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+         install_sig_handlers=True):
     global api_client
 
     args = parse_arguments(arguments)
@@ -1012,10 +1135,46 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if api_client is None:
         api_client = arvados.api('v1', request_id=request_id)
 
-    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
-    # the originals.
-    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
-                            for sigcode in CAUGHT_SIGNALS}
+    if install_sig_handlers:
+        arv_cmd.install_signal_handlers()
+
+    # Trash arguments validation
+    trash_at = None
+    if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
+        try:
+            trash_at = ciso8601.parse_datetime(args.trash_at)
+        except:
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        else:
+            if trash_at.tzinfo is not None:
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                if time.daylight:
+                    utcoffset = datetime.timedelta(seconds=time.altzone)
+                else:
+                    utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+        if trash_at <= datetime.datetime.utcnow():
+            logger.error("--trash-at argument must be set in the future")
+            sys.exit(1)
+    if args.trash_after is not None:
+        if args.trash_after < 1:
+            logger.error("--trash-after argument must be >= 1")
+            sys.exit(1)
+        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
 
     # Determine the name to use
     if args.name:
@@ -1051,6 +1210,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
+    #  Split storage-classes argument
+    storage_classes = None
+    if args.storage_classes:
+        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
+
     # Setup exclude regex from all the --exclude arguments provided
     name_patterns = []
     exclude_paths = []
@@ -1098,6 +1262,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
+                                 batch_mode= args.batch,
                                  filename = args.filename,
                                  reporter = reporter,
                                  api_client = api_client,
@@ -1108,17 +1273,28 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
+                                 storage_classes=storage_classes,
                                  logger=logger,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
-                                 exclude_names=exclude_names)
+                                 exclude_names=exclude_names,
+                                 trash_at=trash_at)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
-    except CollectionUpdateError as error:
+    except ResumeCacheInvalidError:
+        logger.error("\n".join([
+            "arv-put: Resume cache contains invalid signature: it may have expired",
+            "         or been created with another Arvados user's credentials.",
+            "         Switch user or use one of the following options to restart upload:",
+            "         --no-resume to start a new resume cache.",
+            "         --no-cache to disable resume cache.",
+            "         --batch to ignore the resume cache if invalid."]))
+        sys.exit(1)
+    except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1128,10 +1304,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
@@ -1143,7 +1315,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
-    except arvados.errors.ApiError as error:
+    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1158,12 +1330,23 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
-    else:
+    elif writer.manifest_locator() is not None:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                if time.daylight:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+                else:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
-                logger.info("Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
-                logger.info("Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -1173,6 +1356,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
+    else:
+        status = 1
 
     # Print the locator (uuid) of the new collection.
     if output is None:
@@ -1182,8 +1367,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if not output.endswith('\n'):
             stdout.write('\n')
 
-    for sigcode, orig_handler in listitems(orig_signal_handlers):
-        signal.signal(sigcode, orig_handler)
+    if install_sig_handlers:
+        arv_cmd.restore_signal_handlers()
 
     if status != 0:
         sys.exit(status)