10315: Bring back arv-put command from 9701's branch.
[arvados.git] / sdk / python / arvados / commands / put.py
index 0645cb49a7515a68f83d0276c8f3b1ca853138e3..89753a22863808c3fb89686cbf9948dae8760171 100644 (file)
@@ -3,9 +3,9 @@
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
 # TODO:
 # --md5sum - display md5 of each file as read from disk
 
-import apiclient.errors
 import argparse
 import arvados
 import argparse
 import arvados
+import arvados.collection
 import base64
 import datetime
 import errno
 import base64
 import datetime
 import errno
@@ -14,10 +14,15 @@ import hashlib
 import json
 import os
 import pwd
 import json
 import os
 import pwd
+import time
 import signal
 import socket
 import sys
 import tempfile
 import signal
 import socket
 import sys
 import tempfile
+import threading
+import copy
+import logging
+from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
 
 
 import arvados.commands._util as arv_cmd
 
@@ -27,11 +32,13 @@ api_client = None
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
 upload_opts = argparse.ArgumentParser(add_help=False)
 
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
-                    help="""
+                         help="""
 Local file or directory. Default: read from standard input.
 """)
 
 Local file or directory. Default: read from standard input.
 """)
 
-upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
+_group = upload_opts.add_mutually_exclusive_group()
+
+_group.add_argument('--max-manifest-depth', type=int, metavar='N',
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
                     default=-1, help="""
 Maximum depth of directory tree to represent in the manifest
 structure. A directory structure deeper than this will be represented
@@ -40,56 +47,62 @@ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
 stream per filesystem directory that contains files.
 """)
 
 stream per filesystem directory that contains files.
 """)
 
+_group.add_argument('--normalize', action='store_true',
+                    help="""
+Normalize the manifest by re-ordering files and streams after writing
+data.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
-                   help="""
+                    help="""
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
 Synonym for --stream.
 """)
 
 _group.add_argument('--stream', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
 Store the file content and display the resulting manifest on
 stdout. Do not write the manifest to Keep or save a Collection object
 in Arvados.
 """)
 
 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
-                   help="""
+                    help="""
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
 Synonym for --manifest.
 """)
 
 _group.add_argument('--manifest', action='store_true',
-                   help="""
+                    help="""
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
 Store the file data and resulting manifest in Keep, save a Collection
 object in Arvados, and display the manifest locator (Collection uuid)
 on stdout. This is the default behavior.
 """)
 
 _group.add_argument('--as-raw', action='store_true', dest='raw',
-                   help="""
+                    help="""
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
 Synonym for --raw.
 """)
 
 _group.add_argument('--raw', action='store_true',
-                   help="""
+                    help="""
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
 Store the file content and display the data block locators on stdout,
 separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
-                    dest='filename', help="""
+                         dest='filename', help="""
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
 Synonym for --filename.
 """)
 
 upload_opts.add_argument('--filename', type=str, default=None,
-                    help="""
+                         help="""
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
 Use the given filename in the manifest, instead of the name of the
 local file. This is useful when "-" or "/dev/stdin" is given as an
 input file. It can be used only if there is exactly one path given and
@@ -97,11 +110,18 @@ it is not a directory. Implies --manifest.
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
 """)
 
 upload_opts.add_argument('--portable-data-hash', action='store_true',
-                    help="""
+                         help="""
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
 Print the portable data hash instead of the Arvados UUID for the collection
 created by the upload.
 """)
 
+upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
+                         help="""
+Set the replication level for the new collection: how many different
+physical storage devices (e.g., disks) should have a copy of each data
+block. Default is to use the server-provided default (if any) or 2.
+""")
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -115,43 +135,45 @@ Save the collection with the specified name.
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
-                   help="""
+                    help="""
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
 Display human-readable progress on stderr (bytes and, if possible,
 percentage of total data size). This is the default behavior when
 stderr is a tty.
 """)
 
 _group.add_argument('--no-progress', action='store_true',
-                   help="""
+                    help="""
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
 Do not display human-readable progress on stderr, even if stderr is a
 tty.
 """)
 
 _group.add_argument('--batch-progress', action='store_true',
-                   help="""
+                    help="""
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
 Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
-                   help="""
+                    help="""
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
 Continue interrupted uploads from cached state (default).
 """)
 _group.add_argument('--no-resume', action='store_false', dest='resume',
-                   help="""
+                    help="""
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
 Do not continue interrupted uploads from cached state.
 """)
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
-    parents=[upload_opts, run_opts])
+    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
 
     if len(args.paths) == 0:
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
 
     if len(args.paths) == 0:
-        args.paths += ['/dev/stdin']
+        args.paths = ['-']
+
+    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
 
     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
         if args.filename:
@@ -166,9 +188,9 @@ def parse_arguments(arguments):
         args.progress = True
 
     if args.paths == ['-']:
         args.progress = True
 
     if args.paths == ['-']:
-        args.paths = ['/dev/stdin']
+        args.resume = False
         if not args.filename:
         if not args.filename:
-            args.filename = '-'
+            args.filename = 'stdin'
 
     return args
 
 
     return args
 
@@ -208,6 +230,23 @@ class ResumeCache(object):
         self.cache_file.seek(0)
         return json.load(self.cache_file)
 
         self.cache_file.seek(0)
         return json.load(self.cache_file)
 
+    def check_cache(self, api_client=None, num_retries=0):
+        try:
+            state = self.load()
+            locator = None
+            try:
+                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
+                    locator = state["_finished_streams"][0][1][0]
+                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
+                    locator = state["_current_stream_locators"][0]
+                if locator is not None:
+                    kc = arvados.keep.KeepClient(api_client=api_client)
+                    kc.head(locator, num_retries=num_retries)
+            except Exception as e:
+                self.restart()
+        except (ValueError):
+            pass
+
     def save(self, data):
         try:
             new_cache_fd, new_cache_name = tempfile.mkstemp(
     def save(self, data):
         try:
             new_cache_fd, new_cache_name = tempfile.mkstemp(
@@ -241,75 +280,344 @@ class ResumeCache(object):
         self.__init__(self.filename)
 
 
         self.__init__(self.filename)
 
 
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written', '_seen_inputs'])
-
-    def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                 api_client=None):
-        self.bytes_written = 0
-        self._seen_inputs = []
-        self.cache = cache
+class ArvPutUploadJob(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
+
+    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
+                 name=None, owner_uuid=None, ensure_unique_name=False,
+                 num_retries=None, replication_desired=None,
+                 filename=None, update_time=60.0):
+        self.paths = paths
+        self.resume = resume
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(api_client)
-
-    @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None):
+        self.bytes_written = 0
+        self.bytes_skipped = 0
+        self.name = name
+        self.owner_uuid = owner_uuid
+        self.ensure_unique_name = ensure_unique_name
+        self.num_retries = num_retries
+        self.replication_desired = replication_desired
+        self.filename = filename
+        self._state_lock = threading.Lock()
+        self._state = None # Previous run state (file list & manifest)
+        self._current_files = [] # Current run file list
+        self._cache_file = None
+        self._collection = None
+        self._collection_lock = threading.Lock()
+        self._stop_checkpointer = threading.Event()
+        self._checkpointer = threading.Thread(target=self._update_task)
+        self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
+        # Load cached data if any and if needed
+        self._setup_state()
+
+    def start(self):
+        """
+        Start supporting thread & file uploading
+        """
+        self._checkpointer.daemon = True
+        self._checkpointer.start()
         try:
         try:
-            state = cache.load()
-            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected)
-        except (TypeError, ValueError,
-                arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected)
-        else:
-            return writer
-
-    def cache_state(self):
-        if self.cache is None:
-            return
-        state = self.dump_state()
-        # Transform attributes for serialization.
-        for attr, value in state.items():
-            if attr == '_data_buffer':
-                state[attr] = base64.encodestring(''.join(value))
-            elif hasattr(value, 'popleft'):
-                state[attr] = list(value)
-        self.cache.save(state)
+            for path in self.paths:
+                # Test for stdin first, in case some file named '-' exist
+                if path == '-':
+                    self._write_stdin(self.filename or 'stdin')
+                elif os.path.isdir(path):
+                    self._write_directory_tree(path)
+                else:
+                    self._write_file(path, self.filename or os.path.basename(path))
+        finally:
+            # Stop the thread before doing anything else
+            self._stop_checkpointer.set()
+            self._checkpointer.join()
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
+
+    def save_collection(self):
+        with self._collection_lock:
+            self._my_collection().save_new(
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
+
+    def destroy_cache(self):
+        if self.resume:
+            try:
+                os.unlink(self._cache_filename)
+            except OSError as error:
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
+                    raise
+            self._cache_file.close()
+
+    def _collection_size(self, collection):
+        """
+        Recursively get the total size of the collection
+        """
+        size = 0
+        for item in collection.values():
+            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+                size += self._collection_size(item)
+            else:
+                size += item.size()
+        return size
+
+    def _update_task(self):
+        """
+        Periodically called support task. File uploading is
+        asynchronous so we poll status from the collection.
+        """
+        while not self._stop_checkpointer.wait(self._update_task_time):
+            self._update()
+
+    def _update(self):
+        """
+        Update cached manifest text and report progress.
+        """
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._my_collection())
+            # Update cache, if resume enabled
+            if self.resume:
+                with self._state_lock:
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+        if self.resume:
+            self._save_state()
+        # Call the reporter, if any
+        self.report_progress()
 
     def report_progress(self):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
 
     def report_progress(self):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def flush_data(self):
-        start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
-        super(ArvPutCollectionWriter, self).flush_data()
-        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
-            self.bytes_written += (start_buffer_len - self._data_buffer_len)
-            self.report_progress()
-            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
-                self.cache_state()
-
-    def _record_new_input(self, input_type, source_name, dest_name):
-        # The key needs to be a list because that's what we'll get back
-        # from JSON deserialization.
-        key = [input_type, source_name, dest_name]
-        if key in self._seen_inputs:
-            return False
-        self._seen_inputs.append(key)
-        return True
-
-    def write_file(self, source, filename=None):
-        if self._record_new_input('file', source, filename):
-            super(ArvPutCollectionWriter, self).write_file(source, filename)
-
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        if self._record_new_input('directory', path, stream_name):
-            super(ArvPutCollectionWriter, self).write_directory_tree(
-                path, stream_name, max_manifest_depth)
+    def _write_directory_tree(self, path, stream_name="."):
+        # TODO: Check what happens when multiple directories are passed as
+        # arguments.
+        # If the code below is uncommented, integration test
+        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+        # fails, I suppose it is because the manifest_uuid changes because
+        # of the dir addition to stream_name.
+
+        # if stream_name == '.':
+        #     stream_name = os.path.join('.', os.path.basename(path))
+        for item in os.listdir(path):
+            if os.path.isdir(os.path.join(path, item)):
+                self._write_directory_tree(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+            else:
+                self._write_file(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+
+    def _write_stdin(self, filename):
+        with self._collection_lock:
+            output = self._my_collection().open(filename, 'w')
+        self._write(sys.stdin, output)
+        output.close()
+
+    def _write_file(self, source, filename):
+        resume_offset = 0
+        if self.resume:
+            # Check if file was already uploaded (at least partially)
+            with self._collection_lock:
+                try:
+                    file_in_collection = self._my_collection().find(filename)
+                except IOError:
+                    # Not found
+                    file_in_collection = None
+            # If no previous cached data on this file, store it for an eventual
+            # repeated run.
+            if source not in self._state['files']:
+                with self._state_lock:
+                    self._state['files'][source] = {
+                        'mtime': os.path.getmtime(source),
+                        'size' : os.path.getsize(source)
+                    }
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
+            # See if this file was already uploaded at least partially
+            if file_in_collection:
+                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+                    if cached_file_data['size'] == file_in_collection.size():
+                        # File already there, skip it.
+                        self.bytes_skipped += cached_file_data['size']
+                        return
+                    elif cached_file_data['size'] > file_in_collection.size():
+                        # File partially uploaded, resume!
+                        resume_offset = file_in_collection.size()
+                    else:
+                        # Inconsistent cache, re-upload the file
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                else:
+                    # Local file differs from cached data, re-upload it
+                    pass
+        with open(source, 'r') as source_fd:
+            if resume_offset > 0:
+                # Start upload where we left off
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'a')
+                source_fd.seek(resume_offset)
+                self.bytes_skipped += resume_offset
+            else:
+                # Start from scratch
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'w')
+            self._write(source_fd, output)
+            output.close()
+
+    def _write(self, source_fd, output):
+        first_read = True
+        while True:
+            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+            # Allow an empty file to be written
+            if not data and not first_read:
+                break
+            if first_read:
+                first_read = False
+            output.write(data)
+
+    def _my_collection(self):
+        """
+        Create a new collection if none cached. Load it from cache otherwise.
+        """
+        if self._collection is None:
+            with self._state_lock:
+                manifest = self._state['manifest']
+            if self.resume and manifest is not None:
+                # Create collection from saved state
+                self._collection = arvados.collection.Collection(
+                    manifest,
+                    replication_desired=self.replication_desired)
+            else:
+                # Create new collection
+                self._collection = arvados.collection.Collection(
+                    replication_desired=self.replication_desired)
+        return self._collection
+
+    def _setup_state(self):
+        """
+        Create a new cache file or load a previously existing one.
+        """
+        if self.resume:
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            self._cache_file = open(os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+            with self._state_lock:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load how many bytes were uploaded on previous run
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._my_collection())
+        # No resume required
+        else:
+            with self._state_lock:
+                self._state = copy.deepcopy(self.EMPTY_STATE)
+
+    def _lock_file(self, fileobj):
+        try:
+            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+    def _save_state(self):
+        """
+        Atomically save current state into cache.
+        """
+        try:
+            with self._state_lock:
+                state = self._state
+            new_cache_fd, new_cache_name = tempfile.mkstemp(
+                dir=os.path.dirname(self._cache_filename))
+            self._lock_file(new_cache_fd)
+            new_cache = os.fdopen(new_cache_fd, 'r+')
+            json.dump(state, new_cache)
+            new_cache.flush()
+            os.fsync(new_cache)
+            os.rename(new_cache_name, self._cache_filename)
+        except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
+            try:
+                os.unlink(new_cache_name)
+            except NameError:  # mkstemp failed.
+                pass
+        else:
+            self._cache_file.close()
+            self._cache_file = new_cache
+
+    def collection_name(self):
+        with self._collection_lock:
+            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+        return name
+
+    def manifest_locator(self):
+        with self._collection_lock:
+            locator = self._my_collection().manifest_locator()
+        return locator
+
+    def portable_data_hash(self):
+        with self._collection_lock:
+            datahash = self._my_collection().portable_data_hash()
+        return datahash
+
+    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+        with self._collection_lock:
+            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+        return manifest
+
+    def _datablocks_on_item(self, item):
+        """
+        Return a list of datablock locators, recursively navigating
+        through subcollections
+        """
+        if isinstance(item, arvados.arvfile.ArvadosFile):
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
+        elif isinstance(item, arvados.collection.Collection):
+            l = [self._datablocks_on_item(x) for x in item.values()]
+            # Fast list flattener method taken from:
+            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+            return [loc for sublist in l for loc in sublist]
+        else:
+            return None
+
+    def data_locators(self):
+        with self._collection_lock:
+            # Make sure all datablocks are flushed before getting the locators
+            self._my_collection().manifest_text()
+            datablocks = self._datablocks_on_item(self._my_collection())
+        return datablocks
 
 
 def expected_bytes_for(pathlist):
 
 
 def expected_bytes_for(pathlist):
@@ -348,26 +656,24 @@ def progress_writer(progress_func, outfile=sys.stderr):
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
-def check_project_exists(api_client, project_uuid):
-    if project_uuid:
-        if arvados.util.user_uuid_pattern.match(project_uuid):
-            api_client.users().get(uuid=project_uuid).execute()
-            return project_uuid
-        elif arvados.util.group_uuid_pattern.match(project_uuid):
-            api_client.groups().get(uuid=project_uuid).execute()
-            return project_uuid
-        else:
-            raise Exception("Not a valid project uuid: {}".format(project_uuid))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+    if not project_uuid:
+        query = api_client.users().current()
+    elif arvados.util.user_uuid_pattern.match(project_uuid):
+        query = api_client.users().get(uuid=project_uuid)
+    elif arvados.util.group_uuid_pattern.match(project_uuid):
+        query = api_client.groups().get(uuid=project_uuid)
     else:
     else:
-        return api_client.users().current().execute()['uuid']
+        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+    return query.execute(num_retries=num_retries)['uuid']
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
-    if api_client is None:
-        api_client = arvados.api('v1')
-    status = 0
 
     args = parse_arguments(arguments)
 
     args = parse_arguments(arguments)
+    status = 0
+    if api_client is None:
+        api_client = arvados.api('v1')
 
     # Determine the name to use
     if args.name:
 
     # Determine the name to use
     if args.name:
@@ -387,9 +693,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     # Determine the parent project
     try:
 
     # Determine the parent project
     try:
-        project_uuid = check_project_exists(api_client, args.project_uuid)
-    except Exception as error:
-        print >>stderr, "Project {} not found: {}".format(args.project_uuid, error)
+        project_uuid = desired_project_uuid(api_client, args.project_uuid,
+                                            args.retries)
+    except (apiclient_errors.Error, ValueError) as error:
+        print >>stderr, error
         sys.exit(1)
 
     if args.progress:
         sys.exit(1)
 
     if args.progress:
@@ -398,83 +705,69 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
-    bytes_expected = expected_bytes_for(args.paths)
 
 
-    resume_cache = None
-    if args.resume:
-        try:
-            resume_cache = ResumeCache(ResumeCache.make_path(args))
-        except (IOError, OSError, ValueError):
-            pass  # Couldn't open cache directory/file.  Continue without it.
-        except ResumeCacheConflict:
-            print >>stderr, "\n".join([
-                "arv-put: Another process is already uploading this data.",
-                "         Use --no-resume if this is really what you want."])
-            sys.exit(1)
-
-    if resume_cache is None:
-        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
-    else:
-        writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected)
+    bytes_expected = expected_bytes_for(args.paths)
+    try:
+        writer = ArvPutUploadJob(paths = args.paths,
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
+    except ResumeCacheConflict:
+        print >>stderr, "\n".join([
+            "arv-put: Another process is already uploading this data.",
+            "         Use --no-resume if this is really what you want."])
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if writer.bytes_written > 0:  # We're resuming a previous upload.
+    if args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
-    writer.do_queued_work()  # Do work resumed from cache.
-    for path in args.paths:  # Copy file data to Keep.
-        if os.path.isdir(path):
-            writer.write_directory_tree(
-                path, max_manifest_depth=args.max_manifest_depth)
-        else:
-            writer.start_new_stream()
-            writer.write_file(path, args.filename or os.path.basename(path))
-    writer.finish_current_stream()
-
+    output = None
+    writer.start()
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
     if args.stream:
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
     if args.stream:
-        output = writer.manifest_text()
+        if args.normalize:
+            output = writer.manifest_text(normalize=True)
+        else:
+            output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
-            # Register the resulting collection in Arvados.
-            collection = api_client.collections().create(
-                body={
-                    'owner_uuid': project_uuid,
-                    'name': collection_name,
-                    'manifest_text': writer.manifest_text()
-                    },
-                ensure_unique_name=True
-                ).execute()
-
-            print >>stderr, "Collection saved as '%s'" % collection['name']
-
-            if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
-                output = collection['portable_data_hash']
+            writer.save_collection()
+            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.portable_data_hash:
+                output = writer.portable_data_hash()
             else:
             else:
-                output = collection['uuid']
-
-        except apiclient.errors.Error as error:
+                output = writer.manifest_locator()
+        except apiclient_errors.Error as error:
             print >>stderr, (
             print >>stderr, (
-                "arv-put: Error adding Collection to project: {}.".format(
+                "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
 
     # Print the locator (uuid) of the new collection.
                     error))
             status = 1
 
     # Print the locator (uuid) of the new collection.
-    stdout.write(output)
-    if not output.endswith('\n'):
-        stdout.write('\n')
+    if output is None:
+        status = status or 1
+    else:
+        stdout.write(output)
+        if not output.endswith('\n'):
+            stdout.write('\n')
 
     for sigcode, orig_handler in orig_signal_handlers.items():
         signal.signal(sigcode, orig_handler)
 
     for sigcode, orig_handler in orig_signal_handlers.items():
         signal.signal(sigcode, orig_handler)
@@ -482,10 +775,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if status != 0:
         sys.exit(status)
 
     if status != 0:
         sys.exit(status)
 
-    if resume_cache is not None:
-        resume_cache.destroy()
-
+    # Success!
+    writer.destroy_cache()
     return output
 
     return output
 
+
 if __name__ == '__main__':
     main()
 if __name__ == '__main__':
     main()