Merge branch '2411-check-copyright'
[arvados.git] / sdk / python / arvados / commands / put.py
index 1a274104fc76c5ec6449dee5b8387fe3d478170b..548f4b0948ae715393eb657a1693364e8b500639 100644 (file)
@@ -1,28 +1,35 @@
-#!/usr/bin/env python
-
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import division
+from future.utils import listitems, listvalues
+from builtins import str
+from builtins import object
 import argparse
 import arvados
 import arvados.collection
 import base64
 import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
+import traceback
+
 from apiclient import errors as apiclient_errors
 from apiclient import errors as apiclient_errors
+from arvados._version import __version__
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -31,21 +38,20 @@ api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
+upload_opts.add_argument('--version', action='version',
+                         version="%s %s" % (sys.argv[0], __version__),
+                         help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
-Local file or directory. Default: read from standard input.
+Local file or directory. If path is a directory reference with a trailing
+slash, then just upload the directory's contents; otherwise upload the
+directory itself. Default: read from standard input.
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
 """)
 
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -53,6 +59,12 @@ Normalize the manifest by re-ordering files and streams after writing
 data.
 """)
 
 data.
 """)
 
+_group.add_argument('--dry-run', action='store_true', default=False,
+                    help="""
+Don't actually upload files, but only check if any file should be
+uploaded. Exit with code=2 when files are pending for upload.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
@@ -96,6 +108,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 manifest.
 """)
 
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -122,6 +140,15 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
+                         help="""
+Set the number of upload threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -163,6 +190,26 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Do not follow file and directory symlinks.
+""")
+
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -173,7 +220,7 @@ def parse_arguments(arguments):
     if len(args.paths) == 0:
         args.paths = ['-']
 
     if len(args.paths) == 0:
         args.paths = ['-']
 
-    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
+    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
@@ -187,17 +234,58 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class PathDoesNotExistError(Exception):
+    pass
+
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
 
 class ResumeCacheConflict(Exception):
     pass
 
 
+class ArvPutArgumentConflict(Exception):
+    pass
+
+
+class ArvPutUploadIsPending(Exception):
+    pass
+
+
+class ArvPutUploadNotPending(Exception):
+    pass
+
+
+class FileUploadList(list):
+    def __init__(self, dry_run=False):
+        list.__init__(self)
+        self.dry_run = dry_run
+
+    def append(self, other):
+        if self.dry_run:
+            raise ArvPutUploadIsPending()
+        super(FileUploadList, self).append(other)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -209,13 +297,13 @@ class ResumeCache(object):
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
-        md5.update('\0'.join(realpaths))
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
         if any(os.path.isdir(path) for path in realpaths):
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update(b'-1')
         elif args.filename:
         elif args.filename:
-            md5.update(args.filename)
+            md5.update(args.filename.encode())
         return os.path.join(
             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
             md5.hexdigest())
         return os.path.join(
             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
             md5.hexdigest())
@@ -287,12 +375,17 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=60.0):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None,
+                 put_threads=None, replication_desired=None,
+                 filename=None, update_time=60.0, update_collection=None,
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
+                 follow_links=True):
         self.paths = paths
         self.resume = resume
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -302,56 +395,143 @@ class ArvPutUploadJob(object):
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
+        self.put_threads = put_threads
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
         self._collection_lock = threading.Lock()
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
+        self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._update_task_time = update_time  # How many seconds wait between update runs
-        self.logger = logging.getLogger('arvados.arv_put')
+        self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
+        self.logger = logger
+        self.dry_run = dry_run
+        self._checkpoint_before_quit = True
+        self.follow_links = follow_links
+
+        if not self.use_cache and self.resume:
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+        # Check for obvious dry-run responses
+        if self.dry_run and (not self.use_cache or not self.resume):
+            raise ArvPutUploadIsPending()
+
         # Load cached data if any and if needed
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
 
-    def start(self):
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
         """
         Start supporting thread & file uploading
         """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        if not self.dry_run:
+            self._checkpointer.start()
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
+                    if self.dry_run:
+                        raise ArvPutUploadIsPending()
                     self._write_stdin(self.filename or 'stdin')
                     self._write_stdin(self.filename or 'stdin')
+                elif not os.path.exists(path):
+                     raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
                 elif os.path.isdir(path):
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    orig_path = path
+                    path = os.path.abspath(path)
+                    if orig_path[-1:] == os.sep:
+                        # When passing a directory reference with a trailing slash,
+                        # its contents should be uploaded directly to the collection's root.
+                        prefixdir = path
+                    else:
+                        # When passing a directory reference with no trailing slash,
+                        # upload the directory to the collection's root.
+                        prefixdir = os.path.dirname(path)
+                    prefixdir += os.sep
+                    for root, dirs, files in os.walk(path, followlinks=self.follow_links):
+                        # Make os.walk()'s dir traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for f in files:
+                            self._check_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
-        finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
+                    self._check_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
+            # If dry-mode is on, and got up to this point, then we should notify that
+            # there aren't any file to upload.
+            if self.dry_run:
+                raise ArvPutUploadNotPending()
+            # Remove local_collection's files that don't exist locally anymore, so the
+            # bytes_written count is correct.
+            for f in self.collection_file_paths(self._local_collection,
+                                                path_prefix=""):
+                if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                    self._local_collection.remove(f)
+            # Update bytes_written from current local collection and
+            # report initial progress.
             self._update()
             self._update()
-            if self.resume:
+            # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
+            self._upload_files()
+        except (SystemExit, Exception) as e:
+            self._checkpoint_before_quit = False
+            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
+            # Note: We're expecting SystemExit instead of
+            # KeyboardInterrupt because we have a custom signal
+            # handler in place that raises SystemExit with the catched
+            # signal's code.
+            if isinstance(e, PathDoesNotExistError):
+                # We aren't interested in the traceback for this case
+                pass
+            elif not isinstance(e, SystemExit) or e.code != -2:
+                self.logger.warning("Abnormal termination:\n{}".format(
+                    traceback.format_exc()))
+            raise
+        finally:
+            if not self.dry_run:
+                # Stop the thread before doing anything else
+                self._stop_checkpointer.set()
+                self._checkpointer.join()
+                if self._checkpoint_before_quit:
+                    # Commit all pending blocks & one last _update()
+                    self._local_collection.manifest_text()
+                    self._update(final=True)
+                    if save_collection:
+                        self.save_collection()
+            if self.use_cache:
                 self._cache_file.close()
                 self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
 
     def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
     def destroy_cache(self):
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
     def destroy_cache(self):
-        if self.resume:
+        if self.use_cache:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
@@ -365,7 +545,7 @@ class ArvPutUploadJob(object):
         Recursively get the total size of the collection
         """
         size = 0
         Recursively get the total size of the collection
         """
         size = 0
-        for item in collection.values():
+        for item in listvalues(collection):
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
@@ -377,22 +557,34 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
-            # Update cache, if resume enabled
-            if self.resume:
-                with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-        if self.resume:
-            self._save_state()
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
+            if self.use_cache:
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -400,79 +592,91 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
     def _write_stdin(self, filename):
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+        output = self._local_collection.open(filename, 'wb')
         self._write(sys.stdin, output)
         output.close()
 
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """
+        Check if this file needs to be uploaded
+        """
+        # Ignore symlinks when requested
+        if (not self.follow_links) and os.path.islink(source):
+            return
         resume_offset = 0
         resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
+        should_upload = False
+        new_file_in_cache = False
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.add(filename)
+
+        with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
-                with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
-                else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
+                should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
+                self._local_collection.remove(filename)
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
                 self.bytes_skipped += resume_offset
                 self.bytes_skipped += resume_offset
+                should_upload = True
             else:
             else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close()
+                # Inconsistent cache, re-upload the file
+                should_upload = True
+                self._local_collection.remove(filename)
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
+            should_upload = True
+
+        if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
+            with open(source, 'rb') as source_fd:
+                with self._state_lock:
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    output = self._local_collection.open(filename, 'ab')
+                    source_fd.seek(resume_offset)
+                else:
+                    # Start from scratch
+                    output = self._local_collection.open(filename, 'wb')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
         while True:
 
     def _write(self, source_fd, output):
         while True:
@@ -482,42 +686,50 @@ class ArvPutUploadJob(object):
             output.write(data)
 
     def _my_collection(self):
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._remote_collection if self.update else self._local_collection
 
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        if self.use_cache:
+            # Set up cache file name from input paths.
             md5 = hashlib.md5()
             md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
+            md5.update(b'\0'.join([p.encode() for p in realpaths]))
             if self.filename:
             if self.filename:
-                md5.update(self.filename)
+                md5.update(self.filename.encode())
             cache_filename = md5.hexdigest()
             cache_filename = md5.hexdigest()
-            self._cache_file = open(os.path.join(
+            cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename), 'a+')
+                cache_filename)
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+                self._cache_file = open(cache_filepath, 'a+')
+            else:
+                # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
+                self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
             self._cache_file.seek(0)
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
             self._cache_file.seek(0)
-            with self._state_lock:
+
+        with self._state_lock:
+            if self.use_cache:
                 try:
                     self._state = json.load(self._cache_file)
                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
                 try:
                     self._state = json.load(self._cache_file)
                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
@@ -526,13 +738,23 @@ class ArvPutUploadJob(object):
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            else:
+                self.logger.info("No cache usage requested for this run.")
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+
+    def collection_file_paths(self, col, path_prefix='.'):
+        """Return a list of file paths by recursively go through the entire collection `col`"""
+        file_paths = []
+        for name, item in listitems(col):
+            if isinstance(item, arvados.arvfile.ArvadosFile):
+                file_paths.append(os.path.join(path_prefix, name))
+            elif isinstance(item, arvados.collection.Subcollection):
+                new_prefix = os.path.join(path_prefix, name)
+                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+        return file_paths
 
     def _lock_file(self, fileobj):
         try:
 
     def _lock_file(self, fileobj):
         try:
@@ -544,17 +766,20 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
         try:
-            with self._state_lock:
-                state = self._state
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache = tempfile.NamedTemporaryFile(
+                mode='w+',
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
@@ -566,24 +791,23 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest().encode()
+        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
 
     def _datablocks_on_item(self, item):
         """
@@ -591,13 +815,17 @@ class ArvPutUploadJob(object):
         through subcollections
         """
         if isinstance(item, arvados.arvfile.ArvadosFile):
         through subcollections
         """
         if isinstance(item, arvados.arvfile.ArvadosFile):
-            locators = []
-            for segment in item.segments():
-                loc = segment.locator
-                locators.append(loc)
-            return locators
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
         elif isinstance(item, arvados.collection.Collection):
         elif isinstance(item, arvados.collection.Collection):
-            l = [self._datablocks_on_item(x) for x in item.values()]
+            l = [self._datablocks_on_item(x) for x in listvalues(item)]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
             # Fast list flattener method taken from:
             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
             return [loc for sublist in l for loc in sublist]
@@ -612,14 +840,20 @@ class ArvPutUploadJob(object):
         return datablocks
 
 
         return datablocks
 
 
-def expected_bytes_for(pathlist):
+def expected_bytes_for(pathlist, follow_links=True):
     # Walk the given directory trees and stat files, adding up file sizes,
     # so we can display progress as percent
     bytesum = 0
     for path in pathlist:
         if os.path.isdir(path):
     # Walk the given directory trees and stat files, adding up file sizes,
     # so we can display progress as percent
     bytesum = 0
     for path in pathlist:
         if os.path.isdir(path):
-            for filename in arvados.util.listdir_recursive(path):
-                bytesum += os.path.getsize(os.path.join(path, filename))
+            for root, dirs, files in os.walk(path, followlinks=follow_links):
+                # Sum file sizes
+                for f in files:
+                    filepath = os.path.join(root, f)
+                    # Ignore symlinked files when requested
+                    if (not follow_links) and os.path.islink(filepath):
+                        continue
+                    bytesum += os.path.getsize(filepath)
         elif not os.path.isfile(path):
             return None
         else:
         elif not os.path.isfile(path):
             return None
         else:
@@ -662,6 +896,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
+    logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -670,7 +906,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
-            print >>stderr, "Cannot use --name with --stream or --raw"
+            logger.error("Cannot use --name with --stream or --raw")
+            sys.exit(1)
+        elif args.update_collection:
+            logger.error("Cannot use --name with --update-collection")
             sys.exit(1)
         collection_name = args.name
     else:
             sys.exit(1)
         collection_name = args.name
     else:
@@ -680,7 +919,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
-        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        logger.error("Cannot use --project-uuid with --stream or --raw")
         sys.exit(1)
 
     # Determine the parent project
         sys.exit(1)
 
     # Determine the parent project
@@ -688,7 +927,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
-        print >>stderr, error
+        logger.error(error)
         sys.exit(1)
 
     if args.progress:
         sys.exit(1)
 
     if args.progress:
@@ -698,38 +937,77 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
-    bytes_expected = expected_bytes_for(args.paths)
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
+    bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
     try:
         writer = ArvPutUploadJob(paths = args.paths,
-                                resume = args.resume,
-                                reporter = reporter,
-                                bytes_expected = bytes_expected,
-                                num_retries = args.retries,
-                                replication_desired = args.replication,
-                                name = collection_name,
-                                owner_uuid = project_uuid,
-                                ensure_unique_name = True)
+                                 resume = args.resume,
+                                 use_cache = args.use_cache,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 put_threads = args.threads,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection,
+                                 logger=logger,
+                                 dry_run=args.dry_run,
+                                 follow_links=args.follow_links)
     except ResumeCacheConflict:
     except ResumeCacheConflict:
-        print >>stderr, "\n".join([
+        logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."]))
+        sys.exit(1)
+    except CollectionUpdateError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
         sys.exit(1)
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
-        print >>stderr, "\n".join([
-                "arv-put: Resuming previous upload from last checkpoint.",
-                "         Use the --no-resume option to start over."])
+    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
+        logger.warning("\n".join([
+            "arv-put: Resuming previous upload from last checkpoint.",
+            "         Use the --no-resume option to start over."]))
 
 
-    writer.report_progress()
+    if not args.dry_run:
+        writer.report_progress()
     output = None
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
+    except PathDoesNotExistError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
     if args.progress:  # Print newline to split stderr from stdout for humans.
-        print >>stderr
+        logger.info("\n")
 
     if args.stream:
         if args.normalize:
 
     if args.stream:
         if args.normalize:
@@ -740,14 +1018,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                logger.info("Collection updated: '{}'".format(writer.collection_name()))
+            else:
+                logger.info("Collection saved as '{}'".format(writer.collection_name()))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
-            print >>stderr, (
+            logger.error(
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
@@ -760,14 +1040,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if not output.endswith('\n'):
             stdout.write('\n')
 
         if not output.endswith('\n'):
             stdout.write('\n')
 
-    for sigcode, orig_handler in orig_signal_handlers.items():
+    for sigcode, orig_handler in listitems(orig_signal_handlers):
         signal.signal(sigcode, orig_handler)
 
     if status != 0:
         sys.exit(status)
 
     # Success!
         signal.signal(sigcode, orig_handler)
 
     if status != 0:
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
     return output
 
 
     return output