11507: Cleanup
[arvados.git] / sdk / python / arvados / commands / put.py
index d8eb87272fa3326a0af033bb4ee719a9f714f3fc..42510754aba4724bb7b91aa061125fce52a1c1a0 100644 (file)
@@ -23,7 +23,10 @@ import sys
 import tempfile
 import threading
 import time
 import tempfile
 import threading
 import time
+import traceback
+
 from apiclient import errors as apiclient_errors
 from apiclient import errors as apiclient_errors
+from arvados._version import __version__
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -32,6 +35,9 @@ api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
+upload_opts.add_argument('--version', action='version',
+                         version="%s %s" % (sys.argv[0], __version__),
+                         help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 Local file or directory. Default: read from standard input.
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 Local file or directory. Default: read from standard input.
@@ -40,13 +46,7 @@ Local file or directory. Default: read from standard input.
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -54,6 +54,12 @@ Normalize the manifest by re-ordering files and streams after writing
 data.
 """)
 
 data.
 """)
 
+_group.add_argument('--dry-run', action='store_true', default=False,
+                    help="""
+Don't actually upload files, but only check if any file should be
+uploaded. Exit with code=2 when files are pending for upload.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
@@ -129,6 +135,15 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
+                         help="""
+Set the number of upload threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -170,6 +185,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -194,12 +219,17 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
         if not args.filename:
             args.filename = 'stdin'
 
@@ -210,14 +240,33 @@ class CollectionUpdateError(Exception):
     pass
 
 
     pass
 
 
-class ResumeCacheInvalid(Exception):
+class ResumeCacheConflict(Exception):
     pass
 
 
     pass
 
 
-class ResumeCacheConflict(Exception):
+class ArvPutArgumentConflict(Exception):
+    pass
+
+
+class ArvPutUploadIsPending(Exception):
+    pass
+
+
+class ArvPutUploadNotPending(Exception):
     pass
 
 
     pass
 
 
+class FileUploadList(list):
+    def __init__(self, dry_run=False):
+        list.__init__(self)
+        self.dry_run = dry_run
+
+    def append(self, other):
+        if self.dry_run:
+            raise ArvPutUploadIsPending()
+        super(FileUploadList, self).append(other)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -233,7 +282,7 @@ class ResumeCache(object):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update("-1")
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
@@ -307,12 +356,15 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0, update_collection=None):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None,
+                 put_threads=None, replication_desired=None,
+                 filename=None, update_time=60.0, update_collection=None,
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
         self.paths = paths
         self.resume = resume
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
@@ -323,19 +375,32 @@ class ArvPutUploadJob(object):
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
+        self.put_threads = put_threads
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
         self._collection_lock = threading.Lock()
-        self._local_collection = None # Previous run collection manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
+        self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._update_task_time = update_time  # How many seconds wait between update runs
-        self.logger = logging.getLogger('arvados.arv_put')
+        self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
+        self.logger = logger
+        self.dry_run = dry_run
+        self._checkpoint_before_quit = True
+
+        if not self.use_cache and self.resume:
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+        # Check for obvious dry-run responses
+        if self.dry_run and (not self.use_cache or not self.resume):
+            raise ArvPutUploadIsPending()
 
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
         # Load cached data if any and if needed
         self._setup_state(update_collection)
@@ -344,63 +409,93 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
         """
         Start supporting thread & file uploading
         """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        if not self.dry_run:
+            self._checkpointer.start()
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
+                    if self.dry_run:
+                        raise ArvPutUploadIsPending()
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    if path == '.' or path == './' or os.path.dirname(path) == '':
-                        dirname = ''
-                    else:
-                        dirname = os.path.dirname(path) + '/'
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
-                            self._write_file(os.path.join(root, f),
-                                             os.path.join(root[len(dirname):], f))
+                            self._check_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
-        finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
-            if save_collection:
-                self.save_collection()
+                    self._check_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
+            # If dry-mode is on, and got up to this point, then we should notify that
+            # there aren't any file to upload.
+            if self.dry_run:
+                raise ArvPutUploadNotPending()
+            # Remove local_collection's files that don't exist locally anymore, so the
+            # bytes_written count is correct.
+            for f in self.collection_file_paths(self._local_collection,
+                                                path_prefix=""):
+                if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                    self._local_collection.remove(f)
+            # Update bytes_written from current local collection and
+            # report initial progress.
             self._update()
             self._update()
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
+            self._upload_files()
+        except (SystemExit, Exception) as e:
+            self._checkpoint_before_quit = False
+            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
+            # Note: We're expecting SystemExit instead of KeyboardInterrupt because
+            #   we have a custom signal handler in place that raises SystemExit with
+            #   the catched signal's code.
+            if not isinstance(e, SystemExit) or e.code != -2:
+                self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
+            raise
+        finally:
+            if not self.dry_run:
+                # Stop the thread before doing anything else
+                self._stop_checkpointer.set()
+                self._checkpointer.join()
+                if self._checkpoint_before_quit:
+                    # Commit all pending blocks & one last _update()
+                    self._local_collection.manifest_text()
+                    self._update(final=True)
+                    if save_collection:
+                        self.save_collection()
+            if self.use_cache:
+                self._cache_file.close()
 
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
             for fp in self._file_paths:
 
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
             for fp in self._file_paths:
-                remote_file = self._collection.find(fp)
+                remote_file = self._remote_collection.find(fp)
                 if not remote_file:
                     # File don't exist on remote collection, copy it.
                 if not remote_file:
                     # File don't exist on remote collection, copy it.
-                    self._collection.copy(fp, fp, self._local_collection)
+                    self._remote_collection.copy(fp, fp, self._local_collection)
                 elif remote_file != self._local_collection.find(fp):
                     # A different file exist on remote collection, overwrite it.
                 elif remote_file != self._local_collection.find(fp):
                     # A different file exist on remote collection, overwrite it.
-                    self._collection.copy(fp, fp, self._local_collection, overwrite=True)
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._collection.save(num_retries=self.num_retries)
+            self._remote_collection.save(num_retries=self.num_retries)
         else:
         else:
-            self._my_collection().save_new(
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
     def destroy_cache(self):
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
     def destroy_cache(self):
-        if self.resume:
+        if self.use_cache:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
@@ -426,20 +521,34 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
-            # Update cache, if resume enabled
-            with self._state_lock:
-                # Get the manifest text without comitting pending blocks
-                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-            self._save_state()
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
+            if self.use_cache:
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -452,13 +561,13 @@ class ArvPutUploadJob(object):
         self._write(sys.stdin, output)
         output.close()
 
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
-
         # Record file path for updating the remote collection before exiting
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -485,22 +594,35 @@ class ArvPutUploadJob(object):
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
+                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
+                self.bytes_skipped += resume_offset
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
+                self._local_collection.remove(filename)
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
             should_upload = True
 
         if should_upload:
             should_upload = True
 
         if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
@@ -509,7 +631,6 @@ class ArvPutUploadJob(object):
                     # Start upload where we left off
                     output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
                     # Start upload where we left off
                     output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
-                    self.bytes_skipped += resume_offset
                 else:
                     # Start from scratch
                     output = self._local_collection.open(filename, 'w')
                 else:
                     # Start from scratch
                     output = self._local_collection.open(filename, 'w')
@@ -517,18 +638,14 @@ class ArvPutUploadJob(object):
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
             output.write(data)
 
     def _my_collection(self):
-        return self._local_collection
+        return self._remote_collection if self.update else self._local_collection
 
     def _setup_state(self, update_collection):
         """
 
     def _setup_state(self, update_collection):
         """
@@ -538,7 +655,7 @@ class ArvPutUploadJob(object):
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
-                self._collection = arvados.collection.Collection(update_collection)
+                self._remote_collection = arvados.collection.Collection(update_collection)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -547,35 +664,56 @@ class ArvPutUploadJob(object):
             # Collection locator provided, but unknown format
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
             # Collection locator provided, but unknown format
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
-        # Set up cache file name from input paths.
-        md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-        realpaths = sorted(os.path.realpath(path) for path in self.paths)
-        md5.update('\0'.join(realpaths))
-        if self.filename:
-            md5.update(self.filename)
-        cache_filename = md5.hexdigest()
-        self._cache_file = open(os.path.join(
-            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-            cache_filename), 'a+')
-        self._cache_filename = self._cache_file.name
-        self._lock_file(self._cache_file)
-        self._cache_file.seek(0)
+        if self.use_cache:
+            # Set up cache file name from input paths.
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            cache_filepath = os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename)
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+                self._cache_file = open(cache_filepath, 'a+')
+            else:
+                # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
+                self._cache_file = open(cache_filepath, 'w+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+
         with self._state_lock:
         with self._state_lock:
-            try:
-                self._state = json.load(self._cache_file)
-                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                    # Cache at least partially incomplete, set up new cache
+            if self.use_cache:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            except ValueError:
-                # Cache file empty, set up new cache
+            else:
+                self.logger.info("No cache usage requested for this run.")
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
                 self._state = copy.deepcopy(self.EMPTY_STATE)
-
             # Load the previous manifest so we can check if files were modified remotely.
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
-        # Load how many bytes were uploaded on previous run
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+
+    def collection_file_paths(self, col, path_prefix='.'):
+        """Return a list of file paths by recursively go through the entire collection `col`"""
+        file_paths = []
+        for name, item in col.items():
+            if isinstance(item, arvados.arvfile.ArvadosFile):
+                file_paths.append(os.path.join(path_prefix, name))
+            elif isinstance(item, arvados.collection.Subcollection):
+                new_prefix = os.path.join(path_prefix, name)
+                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+        return file_paths
 
     def _lock_file(self, fileobj):
         try:
 
     def _lock_file(self, fileobj):
         try:
@@ -587,17 +725,19 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
         try:
-            with self._state_lock:
-                state = self._state
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache = tempfile.NamedTemporaryFile(
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
@@ -609,21 +749,23 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
 
     def manifest_locator(self):
-        locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
 
     def portable_data_hash(self):
-        datahash = self._my_collection().portable_data_hash()
-        return datahash
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest()
+        local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
 
     def _datablocks_on_item(self, item):
         """
@@ -706,6 +848,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
+    logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -714,7 +858,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
-            print >>stderr, "Cannot use --name with --stream or --raw"
+            logger.error("Cannot use --name with --stream or --raw")
+            sys.exit(1)
+        elif args.update_collection:
+            logger.error("Cannot use --name with --update-collection")
             sys.exit(1)
         collection_name = args.name
     else:
             sys.exit(1)
         collection_name = args.name
     else:
@@ -724,7 +871,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
-        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        logger.error("Cannot use --project-uuid with --stream or --raw")
         sys.exit(1)
 
     # Determine the parent project
         sys.exit(1)
 
     # Determine the parent project
@@ -732,7 +879,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
-        print >>stderr, error
+        logger.error(error)
         sys.exit(1)
 
     if args.progress:
         sys.exit(1)
 
     if args.progress:
@@ -742,57 +889,72 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
+                                 put_threads = args.threads,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
-                                 update_collection = args.update_collection)
+                                 update_collection = args.update_collection,
+                                 logger=logger,
+                                 dry_run=args.dry_run)
     except ResumeCacheConflict:
     except ResumeCacheConflict:
-        print >>stderr, "\n".join([
+        logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
-        sys.exit(1)
-    except ResumeCacheInvalid as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error),
-            "         Use --no-resume or delete/move the cache file to upload to a new collection.",
-            "         Use --update-collection otherwise."])
+            "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
     except CollectionUpdateError as error:
         sys.exit(1)
     except CollectionUpdateError as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error)])
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
         sys.exit(1)
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if not args.update_collection and args.resume and writer.bytes_written > 0:
-        print >>stderr, "\n".join([
-                "arv-put: Resuming previous upload from last checkpoint.",
-                "         Use the --no-resume option to start over."])
+    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
+        logger.warning("\n".join([
+            "arv-put: Resuming previous upload from last checkpoint.",
+            "         Use the --no-resume option to start over."]))
 
 
-    writer.report_progress()
+    if not args.dry_run:
+        writer.report_progress()
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     except arvados.errors.ApiError as error:
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     except arvados.errors.ApiError as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error)])
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
         sys.exit(1)
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
-        print >>stderr
+        logger.info("\n")
 
     if args.stream:
         if args.normalize:
 
     if args.stream:
         if args.normalize:
@@ -804,15 +966,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         try:
             if args.update_collection:
     else:
         try:
             if args.update_collection:
-                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+                logger.info("Collection updated: '{}'".format(writer.collection_name()))
             else:
             else:
-                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
+                logger.info("Collection saved as '{}'".format(writer.collection_name()))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
-            print >>stderr, (
+            logger.error(
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1