Merge branch '10700-dispatch'
[arvados.git] / sdk / python / arvados / commands / put.py
index 95964687b4a3538df1ebb5c541867c57b342cde7..5b46ba75b70d864589f681f1619500def41781d5 100644 (file)
@@ -24,6 +24,7 @@ import tempfile
 import threading
 import time
 from apiclient import errors as apiclient_errors
 import threading
 import time
 from apiclient import errors as apiclient_errors
+from arvados._version import __version__
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -32,6 +33,9 @@ api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
+upload_opts.add_argument('--version', action='version',
+                         version="%s %s" % (sys.argv[0], __version__),
+                         help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 Local file or directory. Default: read from standard input.
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 Local file or directory. Default: read from standard input.
@@ -40,13 +44,7 @@ Local file or directory. Default: read from standard input.
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -54,6 +52,12 @@ Normalize the manifest by re-ordering files and streams after writing
 data.
 """)
 
 data.
 """)
 
+_group.add_argument('--dry-run', action='store_true', default=False,
+                    help="""
+Don't actually upload files, but only check if any file should be
+uploaded. Exit with code=2 when files are pending for upload.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
@@ -97,8 +101,8 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 manifest.
 """)
 
-_group.add_argument('--update-collection', type=str, default=None,
-                    dest='update_collection', metavar="UUID", help="""
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
 Update an existing collection identified by the given Arvados collection
 UUID. All new local files will be uploaded.
 """)
 Update an existing collection identified by the given Arvados collection
 UUID. All new local files will be uploaded.
 """)
@@ -129,6 +133,15 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
+                         help="""
+Set the number of upload threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -170,6 +183,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -194,12 +217,17 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
         if not args.filename:
             args.filename = 'stdin'
 
@@ -210,14 +238,33 @@ class CollectionUpdateError(Exception):
     pass
 
 
     pass
 
 
-class ResumeCacheInvalid(Exception):
+class ResumeCacheConflict(Exception):
+    pass
+
+
+class ArvPutArgumentConflict(Exception):
     pass
 
 
     pass
 
 
-class ResumeCacheConflict(Exception):
+class ArvPutUploadIsPending(Exception):
+    pass
+
+
+class ArvPutUploadNotPending(Exception):
     pass
 
 
     pass
 
 
+class FileUploadList(list):
+    def __init__(self, dry_run=False):
+        list.__init__(self)
+        self.dry_run = dry_run
+
+    def append(self, other):
+        if self.dry_run:
+            raise ArvPutUploadIsPending()
+        super(FileUploadList, self).append(other)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -233,7 +280,7 @@ class ResumeCache(object):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update("-1")
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
@@ -307,12 +354,15 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0, update_collection=None):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None,
+                 put_threads=None, replication_desired=None,
+                 filename=None, update_time=60.0, update_collection=None,
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
         self.paths = paths
         self.resume = resume
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
@@ -323,101 +373,114 @@ class ArvPutUploadJob(object):
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
+        self.put_threads = put_threads
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
         self._collection_lock = threading.Lock()
-        self._local_collection = None # Previous run collection manifest
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
         self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
+        self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._update_task_time = update_time  # How many seconds wait between update runs
-        self.logger = logging.getLogger('arvados.arv_put')
+        self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self.logger = logger
+        self.dry_run = dry_run
 
 
-        # Load an already existing collection for update
-        if update_collection and re.match(arvados.util.collection_uuid_pattern,
-                                          update_collection):
-            try:
-                self._collection = arvados.collection.Collection(update_collection)
-            except arvados.errors.ApiError as error:
-                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
-            else:
-                self.update = True
-        elif update_collection:
-            # Collection locator provided, but unknown format
-            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
-        else:
-            # No collection asked for update, set up an empty one.
-            self._collection = arvados.collection.Collection(replication_desired=self.replication_desired)
+        if not self.use_cache and self.resume:
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+        # Check for obvious dry-run responses
+        if self.dry_run and (not self.use_cache or not self.resume):
+            raise ArvPutUploadIsPending()
 
         # Load cached data if any and if needed
 
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
 
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        if not self.dry_run:
+            self._checkpointer.start()
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
+                    if self.dry_run:
+                        raise ArvPutUploadIsPending()
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    if path == '.' or path == './' or os.path.dirname(path) == '':
-                        dirname = ''
-                    else:
-                        dirname = os.path.dirname(path) + '/'
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
-                            self._write_file(os.path.join(root, f),
-                                             os.path.join(root[len(dirname):], f))
+                            self._check_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
-        finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
-            if save_collection:
-                self.save_collection()
+                    self._check_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
+            # If dry-mode is on, and got up to this point, then we should notify that
+            # there aren't any file to upload.
+            if self.dry_run:
+                raise ArvPutUploadNotPending()
+            # Remove local_collection's files that don't exist locally anymore, so the
+            # bytes_written count is correct.
+            for f in self.collection_file_paths(self._local_collection,
+                                                path_prefix=""):
+                if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                    self._local_collection.remove(f)
+            # Update bytes_written from current local collection and
+            # report initial progress.
             self._update()
             self._update()
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Actual file upload
+            self._upload_files()
+        finally:
+            if not self.dry_run:
+                # Stop the thread before doing anything else
+                self._stop_checkpointer.set()
+                self._checkpointer.join()
+                # Commit all pending blocks & one last _update()
+                self._local_collection.manifest_text()
+                self._update(final=True)
+                if save_collection:
+                    self.save_collection()
+            if self.use_cache:
+                self._cache_file.close()
 
     def save_collection(self):
 
     def save_collection(self):
-        with self._collection_lock:
-            if self.update:
-                # Check if files should be updated on the remote collection.
-                for fp in self._file_paths:
-                    remote_file = self._collection.find(fp)
-                    if not remote_file:
-                        # File don't exist on remote collection, copy it.
-                        self._collection.copy(fp, fp, self._local_collection)
-                    elif remote_file != self._local_collection.find(fp):
-                        # A different file exist on remote collection, overwrite it.
-                        self._collection.copy(fp, fp, self._local_collection, overwrite=True)
-                    else:
-                        # The file already exist on remote collection, skip it.
-                        pass
-                self._collection.save(num_retries=self.num_retries)
-            else:
-                self._my_collection().save_new(
-                    name=self.name, owner_uuid=self.owner_uuid,
-                    ensure_unique_name=self.ensure_unique_name,
-                    num_retries=self.num_retries)
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
 
     def destroy_cache(self):
-        if self.resume:
+        if self.use_cache:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
@@ -446,16 +509,24 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
             self.bytes_written = self._collection_size(self._local_collection)
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
             self.bytes_written = self._collection_size(self._local_collection)
-            # Update cache, if resume enabled
-            with self._state_lock:
-                # Get the manifest text without comitting pending blocks
-                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+            if self.use_cache:
+                if final:
+                    manifest = self._local_collection.manifest_text()
+                else:
+                    # Get the manifest text without comitting pending blocks
+                    manifest = self._local_collection.manifest_text(strip=False,
+                                                                    normalize=False,
+                                                                    only_committed=True)
+                # Update cache
+                with self._state_lock:
+                    self._state['manifest'] = manifest
+        if self.use_cache:
             self._save_state()
         # Call the reporter, if any
         self.report_progress()
             self._save_state()
         # Call the reporter, if any
         self.report_progress()
@@ -465,16 +536,15 @@ class ArvPutUploadJob(object):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._local_collection.open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
-
         # Record file path for updating the remote collection before exiting
         self._file_paths.append(filename)
 
         # Record file path for updating the remote collection before exiting
         self._file_paths.append(filename)
 
@@ -490,8 +560,7 @@ class ArvPutUploadJob(object):
             cached_file_data = self._state['files'][source]
 
         # Check if file was already uploaded (at least partially)
             cached_file_data = self._state['files'][source]
 
         # Check if file was already uploaded (at least partially)
-        with self._collection_lock:
-            file_in_local_collection = self._local_collection.find(filename)
+        file_in_local_collection = self._local_collection.find(filename)
 
         # If not resuming, upload the full file.
         if not self.resume:
 
         # If not resuming, upload the full file.
         if not self.resume:
@@ -504,85 +573,123 @@ class ArvPutUploadJob(object):
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
+                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
+                self.bytes_skipped += resume_offset
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
+                self._local_collection.remove(filename)
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
             should_upload = True
 
         if should_upload:
             should_upload = True
 
         if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
-                    with self._collection_lock:
-                        output = self._local_collection.open(filename, 'a')
+                    output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
                     source_fd.seek(resume_offset)
-                    self.bytes_skipped += resume_offset
                 else:
                     # Start from scratch
                 else:
                     # Start from scratch
-                    with self._collection_lock:
-                        output = self._local_collection.open(filename, 'w')
+                    output = self._local_collection.open(filename, 'w')
                 self._write(source_fd, output)
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
                 self._write(source_fd, output)
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
             output.write(data)
 
     def _my_collection(self):
-        return self._local_collection
+        return self._remote_collection if self.update else self._local_collection
 
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
         """
         Create a new cache file or load a previously existing one.
         """
-        md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-        realpaths = sorted(os.path.realpath(path) for path in self.paths)
-        md5.update('\0'.join(realpaths))
-        if self.filename:
-            md5.update(self.filename)
-        cache_filename = md5.hexdigest()
-        self._cache_file = open(os.path.join(
-            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-            cache_filename), 'a+')
-        self._cache_filename = self._cache_file.name
-        self._lock_file(self._cache_file)
-        self._cache_file.seek(0)
-        with self._state_lock:
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
             try:
             try:
-                self._state = json.load(self._cache_file)
-                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                    # Cache at least partially incomplete, set up new cache
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        if self.use_cache:
+            # Set up cache file name from input paths.
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            cache_filepath = os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename)
+            if self.resume:
+                self._cache_file = open(cache_filepath, 'a+')
+            else:
+                # --no-resume means start with a empty cache file.
+                self._cache_file = open(cache_filepath, 'w+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+
+        with self._state_lock:
+            if self.use_cache:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            except ValueError:
-                # Cache file empty, set up new cache
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
                 self._state = copy.deepcopy(self.EMPTY_STATE)
-
             # Load the previous manifest so we can check if files were modified remotely.
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
-        # Load how many bytes were uploaded on previous run
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+
+    def collection_file_paths(self, col, path_prefix='.'):
+        """Return a list of file paths by recursively go through the entire collection `col`"""
+        file_paths = []
+        for name, item in col.items():
+            if isinstance(item, arvados.arvfile.ArvadosFile):
+                file_paths.append(os.path.join(path_prefix, name))
+            elif isinstance(item, arvados.collection.Subcollection):
+                new_prefix = os.path.join(path_prefix, name)
+                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+        return file_paths
 
     def _lock_file(self, fileobj):
         try:
 
     def _lock_file(self, fileobj):
         try:
@@ -596,7 +703,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -616,24 +723,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
 
     def _datablocks_on_item(self, item):
         """
@@ -716,6 +815,7 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
+    logger = logging.getLogger('arvados.arv_put')
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -724,7 +824,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
-            print >>stderr, "Cannot use --name with --stream or --raw"
+            logger.error("Cannot use --name with --stream or --raw")
+            sys.exit(1)
+        elif args.update_collection:
+            logger.error("Cannot use --name with --update-collection")
             sys.exit(1)
         collection_name = args.name
     else:
             sys.exit(1)
         collection_name = args.name
     else:
@@ -734,7 +837,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
-        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        logger.error("Cannot use --project-uuid with --stream or --raw")
         sys.exit(1)
 
     # Determine the parent project
         sys.exit(1)
 
     # Determine the parent project
@@ -742,7 +845,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
-        print >>stderr, error
+        logger.error(error)
         sys.exit(1)
 
     if args.progress:
         sys.exit(1)
 
     if args.progress:
@@ -757,52 +860,63 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
+                                 put_threads = args.threads,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
-                                 update_collection = args.update_collection)
+                                 update_collection = args.update_collection,
+                                 logger=logger,
+                                 dry_run=args.dry_run)
     except ResumeCacheConflict:
     except ResumeCacheConflict:
-        print >>stderr, "\n".join([
+        logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
-        sys.exit(1)
-    except ResumeCacheInvalid as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error),
-            "         Use --no-resume or delete/move the cache file to upload to a new collection.",
-            "         Use --update-collection otherwise."])
+            "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
     except CollectionUpdateError as error:
         sys.exit(1)
     except CollectionUpdateError as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error)])
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
         sys.exit(1)
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if not args.update_collection and args.resume and writer.bytes_written > 0:
-        print >>stderr, "\n".join([
-                "arv-put: Resuming previous upload from last checkpoint.",
-                "         Use the --no-resume option to start over."])
+    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
+        logger.warning("\n".join([
+            "arv-put: Resuming previous upload from last checkpoint.",
+            "         Use the --no-resume option to start over."]))
 
 
-    writer.report_progress()
+    if not args.dry_run:
+        writer.report_progress()
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     except arvados.errors.ApiError as error:
     output = None
     try:
         writer.start(save_collection=not(args.stream or args.raw))
     except arvados.errors.ApiError as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error)])
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
         sys.exit(1)
         sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
-        print >>stderr
+        logger.info("\n")
 
     if args.stream:
         if args.normalize:
 
     if args.stream:
         if args.normalize:
@@ -814,15 +928,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         try:
             if args.update_collection:
     else:
         try:
             if args.update_collection:
-                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+                logger.info("Collection updated: '{}'".format(writer.collection_name()))
             else:
             else:
-                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
+                logger.info("Collection saved as '{}'".format(writer.collection_name()))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
-            print >>stderr, (
+            logger.error(
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1