10111: Merge branch 'master' into 10111-cr-provenance-graph
[arvados.git] / sdk / python / arvados / collection.py
index ea18123f65474ff67bd6b4c137ecb15208694f34..f26d3a3d27c0b221d269d3d4a1beb8775268f7e6 100644 (file)
@@ -3,17 +3,24 @@ import logging
 import os
 import re
 import errno
 import os
 import re
 import errno
+import hashlib
+import time
+import threading
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
-from keep import *
-from .stream import StreamReader, normalize_stream, locator_block_size
-from .ranges import Range, LocatorAndRange
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from keep import KeepLocator, KeepClient
+from .stream import StreamReader
+from ._normalize_stream import normalize_stream
+from ._ranges import Range, LocatorAndRange
+from .safeapi import ThreadSafeApiCache
 import config
 import errors
 import util
 import config
 import errors
 import util
+import events
+from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
 
 _logger = logging.getLogger('arvados.collection')
 
@@ -31,7 +38,8 @@ class CollectionBase(object):
         return self._keep_client
 
     def stripped_manifest(self):
         return self._keep_client
 
     def stripped_manifest(self):
-        """
+        """Get the manifest with locator hints stripped.
+
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
@@ -50,197 +58,7 @@ class CollectionBase(object):
         return ''.join(clean)
 
 
         return ''.join(clean)
 
 
-class CollectionReader(CollectionBase):
-    def __init__(self, manifest_locator_or_text, api_client=None,
-                 keep_client=None, num_retries=0):
-        """Instantiate a CollectionReader.
-
-        This class parses Collection manifests to provide a simple interface
-        to read its underlying files.
-
-        Arguments:
-        * manifest_locator_or_text: One of a Collection UUID, portable data
-          hash, or full manifest text.
-        * api_client: The API client to use to look up Collections.  If not
-          provided, CollectionReader will build one from available Arvados
-          configuration.
-        * keep_client: The KeepClient to use to download Collection data.
-          If not provided, CollectionReader will build one from available
-          Arvados configuration.
-        * num_retries: The default number of times to retry failed
-          service requests.  Default 0.  You may change this value
-          after instantiation, but note those changes may not
-          propagate to related objects like the Keep client.
-        """
-        self._api_client = api_client
-        self._keep_client = keep_client
-        self.num_retries = num_retries
-        if re.match(util.keep_locator_pattern, manifest_locator_or_text):
-            self._manifest_locator = manifest_locator_or_text
-            self._manifest_text = None
-        elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
-            self._manifest_locator = manifest_locator_or_text
-            self._manifest_text = None
-        elif re.match(util.manifest_pattern, manifest_locator_or_text):
-            self._manifest_text = manifest_locator_or_text
-            self._manifest_locator = None
-        else:
-            raise errors.ArgumentError(
-                "Argument to CollectionReader must be a manifest or a collection UUID")
-        self._api_response = None
-        self._streams = None
-
-    def _populate_from_api_server(self):
-        # As in KeepClient itself, we must wait until the last
-        # possible moment to instantiate an API client, in order to
-        # avoid tripping up clients that don't have access to an API
-        # server.  If we do build one, make sure our Keep client uses
-        # it.  If instantiation fails, we'll fall back to the except
-        # clause, just like any other Collection lookup
-        # failure. Return an exception, or None if successful.
-        try:
-            if self._api_client is None:
-                self._api_client = arvados.api('v1')
-                self._keep_client = None  # Make a new one with the new api.
-            self._api_response = self._api_client.collections().get(
-                uuid=self._manifest_locator).execute(
-                num_retries=self.num_retries)
-            self._manifest_text = self._api_response['manifest_text']
-            return None
-        except Exception as e:
-            return e
-
-    def _populate_from_keep(self):
-        # Retrieve a manifest directly from Keep. This has a chance of
-        # working if [a] the locator includes a permission signature
-        # or [b] the Keep services are operating in world-readable
-        # mode. Return an exception, or None if successful.
-        try:
-            self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries)
-        except Exception as e:
-            return e
-
-    def _populate(self):
-        error_via_api = None
-        error_via_keep = None
-        should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
-                self._manifest_locator))
-        if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            error_via_api = self._populate_from_api_server()
-            if error_via_api is not None and not should_try_keep:
-                raise error_via_api
-        if ((self._manifest_text is None) and
-            not error_via_keep and
-            should_try_keep):
-            # Looks like a keep locator, and we didn't already try keep above
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            # Nothing worked!
-            raise arvados.errors.NotFoundError(
-                ("Failed to retrieve collection '{}' " +
-                 "from either API server ({}) or Keep ({})."
-                 ).format(
-                    self._manifest_locator,
-                    error_via_api,
-                    error_via_keep))
-        self._streams = [sline.split()
-                         for sline in self._manifest_text.split("\n")
-                         if sline]
-
-    def _populate_first(orig_func):
-        # Decorator for methods that read actual Collection data.
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            if self._streams is None:
-                self._populate()
-            return orig_func(self, *args, **kwargs)
-        return wrapper
-
-    @_populate_first
-    def api_response(self):
-        """api_response() -> dict or None
-
-        Returns information about this Collection fetched from the API server.
-        If the Collection exists in Keep but not the API server, currently
-        returns None.  Future versions may provide a synthetic response.
-        """
-        return self._api_response
-
-    @_populate_first
-    def normalize(self):
-        # Rearrange streams
-        streams = {}
-        for s in self.all_streams():
-            for f in s.all_files():
-                streamname, filename = split(s.name() + "/" + f.name())
-                if streamname not in streams:
-                    streams[streamname] = {}
-                if filename not in streams[streamname]:
-                    streams[streamname][filename] = []
-                for r in f.segments:
-                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
-
-        self._streams = [normalize_stream(s, streams[s])
-                         for s in sorted(streams)]
-
-        # Regenerate the manifest text based on the normalized streams
-        self._manifest_text = ''.join(
-            [StreamReader(stream, keep=self._my_keep()).manifest_text()
-             for stream in self._streams])
-
-    @_populate_first
-    def open(self, streampath, filename=None):
-        """open(streampath[, filename]) -> file-like object
-
-        Pass in the path of a file to read from the Collection, either as a
-        single string or as two separate stream name and file name arguments.
-        This method returns a file-like object to read that file.
-        """
-        if filename is None:
-            streampath, filename = split(streampath)
-        keep_client = self._my_keep()
-        for stream_s in self._streams:
-            stream = StreamReader(stream_s, keep_client,
-                                  num_retries=self.num_retries)
-            if stream.name() == streampath:
-                break
-        else:
-            raise ValueError("stream '{}' not found in Collection".
-                             format(streampath))
-        try:
-            return stream.files()[filename]
-        except KeyError:
-            raise ValueError("file '{}' not found in Collection stream '{}'".
-                             format(filename, streampath))
-
-    @_populate_first
-    def all_streams(self):
-        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
-                for s in self._streams]
-
-    def all_files(self):
-        for s in self.all_streams():
-            for f in s.all_files():
-                yield f
-
-    @_populate_first
-    def manifest_text(self, strip=False, normalize=False):
-        if normalize:
-            cr = CollectionReader(self.manifest_text())
-            cr.normalize()
-            return cr.manifest_text(strip=strip, normalize=False)
-        elif strip:
-            return self.stripped_manifest()
-        else:
-            return self._manifest_text
-
-
-class _WriterFile(ArvadosFileBase):
+class _WriterFile(_FileLikeObjectBase):
     def __init__(self, coll_writer, name):
         super(_WriterFile, self).__init__(name, 'wb')
         self.dest = coll_writer
     def __init__(self, coll_writer, name):
         super(_WriterFile, self).__init__(name, 'wb')
         self.dest = coll_writer
@@ -249,22 +67,22 @@ class _WriterFile(ArvadosFileBase):
         super(_WriterFile, self).close()
         self.dest.finish_current_file()
 
         super(_WriterFile, self).close()
         self.dest.finish_current_file()
 
-    @ArvadosFileBase._before_close
+    @_FileLikeObjectBase._before_close
     def write(self, data):
         self.dest.write(data)
 
     def write(self, data):
         self.dest.write(data)
 
-    @ArvadosFileBase._before_close
+    @_FileLikeObjectBase._before_close
     def writelines(self, seq):
         for data in seq:
             self.write(data)
 
     def writelines(self, seq):
         for data in seq:
             self.write(data)
 
-    @ArvadosFileBase._before_close
+    @_FileLikeObjectBase._before_close
     def flush(self):
         self.dest.flush_data()
 
 
 class CollectionWriter(CollectionBase):
     def flush(self):
         self.dest.flush_data()
 
 
 class CollectionWriter(CollectionBase):
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
         CollectionWriter lets you build a new Arvados Collection from scratch.
         """Instantiate a CollectionWriter.
 
         CollectionWriter lets you build a new Arvados Collection from scratch.
@@ -280,9 +98,13 @@ class CollectionWriter(CollectionBase):
           service requests.  Default 0.  You may change this value
           after instantiation, but note those changes may not
           propagate to related objects like the Keep client.
           service requests.  Default 0.  You may change this value
           after instantiation, but note those changes may not
           propagate to related objects like the Keep client.
+        * replication: The number of copies of each block to store.
+          If this argument is None or not supplied, replication is
+          the server-provided default if available, otherwise 2.
         """
         self._api_client = api_client
         self.num_retries = num_retries
         """
         self._api_client = api_client
         self.num_retries = num_retries
+        self.replication = (2 if replication is None else replication)
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
@@ -437,7 +259,9 @@ class CollectionWriter(CollectionBase):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
-                self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+                self._my_keep().put(
+                    data_buffer[0:config.KEEP_BLOCK_SIZE],
+                    copies=self.replication))
             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -483,7 +307,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -512,8 +337,16 @@ class CollectionWriter(CollectionBase):
         self._current_file_name = None
 
     def finish(self):
         self._current_file_name = None
 
     def finish(self):
-        # Store the manifest in Keep and return its locator.
-        return self._my_keep().put(self.manifest_text())
+        """Store the manifest in Keep and return its locator.
+
+        This is useful for storing manifest fragments (task outputs)
+        temporarily in Keep during a Crunch job.
+
+        In other cases you should make a collection instead, by
+        sending manifest_text() to the API server's "create
+        collection" endpoint.
+        """
+        return self._my_keep().put(self.manifest_text(), copies=self.replication)
 
     def portable_data_hash(self):
         stripped = self.stripped_manifest()
 
     def portable_data_hash(self):
         stripped = self.stripped_manifest()
@@ -547,10 +380,9 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, **kwargs):
         self._dependencies = {}
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__(
-            api_client, num_retries=num_retries)
+        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
@@ -640,20 +472,754 @@ class ResumableCollectionWriter(CollectionWriter):
         return super(ResumableCollectionWriter, self).write(data)
 
 
         return super(ResumableCollectionWriter, self).write(data)
 
 
-class Collection(CollectionBase):
-    def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
-                 keep_client=None, num_retries=0, block_manager=None):
+ADD = "add"
+DEL = "del"
+MOD = "mod"
+TOK = "tok"
+FILE = "file"
+COLLECTION = "collection"
 
 
+class RichCollectionBase(CollectionBase):
+    """Base class for Collections and Subcollections.
+
+    Implements the majority of functionality relating to accessing items in the
+    Collection.
+
+    """
+
+    def __init__(self, parent=None):
         self.parent = parent
         self.parent = parent
-        self._items = None
+        self._committed = False
+        self._callback = None
+        self._items = {}
+
+    def _my_api(self):
+        raise NotImplementedError()
+
+    def _my_keep(self):
+        raise NotImplementedError()
+
+    def _my_block_manager(self):
+        raise NotImplementedError()
+
+    def writable(self):
+        raise NotImplementedError()
+
+    def root_collection(self):
+        raise NotImplementedError()
+
+    def notify(self, event, collection, name, item):
+        raise NotImplementedError()
+
+    def stream_name(self):
+        raise NotImplementedError()
+
+    @must_be_writable
+    @synchronized
+    def find_or_create(self, path, create_type):
+        """Recursively search the specified file path.
+
+        May return either a `Collection` or `ArvadosFile`.  If not found, will
+        create a new item at the specified path based on `create_type`.  Will
+        create intermediate subcollections needed to contain the final item in
+        the path.
+
+        :create_type:
+          One of `arvados.collection.FILE` or
+          `arvados.collection.COLLECTION`.  If the path is not found, and value
+          of create_type is FILE then create and return a new ArvadosFile for
+          the last path component.  If COLLECTION, then create and return a new
+          Collection for the last path component.
+
+        """
+
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0]:
+            item = self._items.get(pathcomponents[0])
+            if len(pathcomponents) == 1:
+                if item is None:
+                    # create new file
+                    if create_type == COLLECTION:
+                        item = Subcollection(self, pathcomponents[0])
+                    else:
+                        item = ArvadosFile(self, pathcomponents[0])
+                    self._items[pathcomponents[0]] = item
+                    self.set_committed(False)
+                    self.notify(ADD, self, pathcomponents[0], item)
+                return item
+            else:
+                if item is None:
+                    # create new collection
+                    item = Subcollection(self, pathcomponents[0])
+                    self._items[pathcomponents[0]] = item
+                    self.set_committed(False)
+                    self.notify(ADD, self, pathcomponents[0], item)
+                if isinstance(item, RichCollectionBase):
+                    return item.find_or_create(pathcomponents[1], create_type)
+                else:
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+        else:
+            return self
+
+    @synchronized
+    def find(self, path):
+        """Recursively search the specified file path.
+
+        May return either a Collection or ArvadosFile. Return None if not
+        found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
+
+        """
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' is empty.")
+
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
+        item = self._items.get(pathcomponents[0])
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
+            return item
+        else:
+            if isinstance(item, RichCollectionBase):
+                if pathcomponents[1]:
+                    return item.find(pathcomponents[1])
+                else:
+                    return item
+            else:
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
+    @synchronized
+    def mkdirs(self, path):
+        """Recursive subcollection create.
+
+        Like `os.makedirs()`.  Will create intermediate subcollections needed
+        to contain the leaf subcollection path.
+
+        """
+
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
+
+        return self.find_or_create(path, COLLECTION)
+
+    def open(self, path, mode="r"):
+        """Open a file-like object for access.
+
+        :path:
+          path to a file in the collection
+        :mode:
+          one of "r", "r+", "w", "w+", "a", "a+"
+          :"r":
+            opens for reading
+          :"r+":
+            opens for reading and writing.  Reads/writes share a file pointer.
+          :"w", "w+":
+            truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
+          :"a", "a+":
+            opens for reading and writing.  All writes are appended to
+            the end of the file.  Writing does not affect the file pointer for
+            reading.
+        """
+        mode = mode.replace("b", "")
+        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
+            raise errors.ArgumentError("Bad mode '%s'" % mode)
+        create = (mode != "r")
+
+        if create and not self.writable():
+            raise IOError(errno.EROFS, "Collection is read only")
+
+        if create:
+            arvfile = self.find_or_create(path, FILE)
+        else:
+            arvfile = self.find(path)
+
+        if arvfile is None:
+            raise IOError(errno.ENOENT, "File not found", path)
+        if not isinstance(arvfile, ArvadosFile):
+            raise IOError(errno.EISDIR, "Is a directory", path)
+
+        if mode[0] == "w":
+            arvfile.truncate(0)
+
+        name = os.path.basename(path)
+
+        if mode == "r":
+            return ArvadosFileReader(arvfile, num_retries=self.num_retries)
+        else:
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+
+    def modified(self):
+        """Determine if the collection has been modified since last commited."""
+        return not self.committed()
+
+    @synchronized
+    def committed(self):
+        """Determine if the collection has been committed to the API server."""
+        return self._committed
+
+    @synchronized
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in self._items.items():
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
+
+    @synchronized
+    def __iter__(self):
+        """Iterate over names of files and collections contained in this collection."""
+        return iter(self._items.keys())
+
+    @synchronized
+    def __getitem__(self, k):
+        """Get a file or collection that is directly contained by this collection.
+
+        If you want to search a path, use `find()` instead.
+
+        """
+        return self._items[k]
+
+    @synchronized
+    def __contains__(self, k):
+        """Test if there is a file or collection a directly contained by this collection."""
+        return k in self._items
+
+    @synchronized
+    def __len__(self):
+        """Get the number of items directly contained in this collection."""
+        return len(self._items)
+
+    @must_be_writable
+    @synchronized
+    def __delitem__(self, p):
+        """Delete an item by name which is directly contained by this collection."""
+        del self._items[p]
+        self.set_committed(False)
+        self.notify(DEL, self, p, None)
+
+    @synchronized
+    def keys(self):
+        """Get a list of names of files and collections directly contained in this collection."""
+        return self._items.keys()
+
+    @synchronized
+    def values(self):
+        """Get a list of files and collection objects directly contained in this collection."""
+        return self._items.values()
+
+    @synchronized
+    def items(self):
+        """Get a list of (name, object) tuples directly contained in this collection."""
+        return self._items.items()
+
+    def exists(self, path):
+        """Test if there is a file or collection at `path`."""
+        return self.find(path) is not None
+
+    @must_be_writable
+    @synchronized
+    def remove(self, path, recursive=False):
+        """Remove the file or subcollection (directory) at `path`.
+
+        :recursive:
+          Specify whether to remove non-empty subcollections (True), or raise an error (False).
+        """
+
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' is empty.")
+
+        pathcomponents = path.split("/", 1)
+        item = self._items.get(pathcomponents[0])
+        if item is None:
+            raise IOError(errno.ENOENT, "File not found", path)
+        if len(pathcomponents) == 1:
+            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
+            deleteditem = self._items[pathcomponents[0]]
+            del self._items[pathcomponents[0]]
+            self.set_committed(False)
+            self.notify(DEL, self, pathcomponents[0], deleteditem)
+        else:
+            item.remove(pathcomponents[1])
+
+    def _clonefrom(self, source):
+        for k,v in source.items():
+            self._items[k] = v.clone(self, k)
+
+    def clone(self):
+        raise NotImplementedError()
+
+    @must_be_writable
+    @synchronized
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
+
+        :source_obj:
+          An ArvadosFile, or Subcollection object
+
+        :target_name:
+          Destination item name.  If the target name already exists and is a
+          file, this will raise an error unless you specify `overwrite=True`.
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
+        """
+
+        if target_name in self and not overwrite:
+            raise IOError(errno.EEXIST, "File already exists", target_name)
+
+        modified_from = None
+        if target_name in self:
+            modified_from = self[target_name]
+
+        # Actually make the move or copy.
+        if reparent:
+            source_obj._reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
+        self.set_committed(False)
+
+        if modified_from:
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found", source)
+            sourcecomponents = source.split("/")
+        else:
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        else:
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
+
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
+
+    @must_be_writable
+    @synchronized
+    def copy(self, source, target_path, source_collection=None, overwrite=False):
+        """Copy a file or subcollection to a new path in this collection.
+
+        :source:
+          A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
+
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
+
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
+
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
+
+        :source:
+          A string with a path to source file or subcollection.
+
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
+
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
+
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection is read only", source)
+        target_dir.add(source_obj, target_name, overwrite, True)
+
+    def portable_manifest_text(self, stream_name="."):
+        """Get the manifest text for this collection, sub collections and files.
+
+        This method does not flush outstanding blocks to Keep.  It will return
+        a normalized manifest with access tokens stripped.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        """
+        return self._get_manifest_text(stream_name, True, True)
+
+    @synchronized
+    def manifest_text(self, stream_name=".", strip=False, normalize=False,
+                      only_committed=False):
+        """Get the manifest text for this collection, sub collections and files.
+
+        This method will flush outstanding blocks to Keep.  By default, it will
+        not normalize an unmodified manifest or strip access tokens.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False (default), block locators are left unchanged.
+
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False (default) and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
+
+        :only_committed:
+          If True, don't commit pending blocks.
+
+        """
+
+        if not only_committed:
+            self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize,
+                                       only_committed=only_committed)
+
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+        """Get the manifest text for this collection, sub collections and files.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False (default), block locators are left unchanged.
+
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False (default) and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
+
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
+        """
+
+        if not self.committed() or self._manifest_text is None or normalize:
+            stream = {}
+            buf = []
+            sorted_keys = sorted(self.keys())
+            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
+                # Create a stream per file `k`
+                arvfile = self[filename]
+                filestream = []
+                for segment in arvfile.segments():
+                    loc = segment.locator
+                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
+                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
+                    if strip:
+                        loc = KeepLocator(loc).stripped()
+                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
+                                         segment.segment_offset, segment.range_size))
+                stream[filename] = filestream
+            if stream:
+                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
+            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+            return "".join(buf)
+        else:
+            if strip:
+                return self.stripped_manifest()
+            else:
+                return self._manifest_text
+
+    @synchronized
+    def diff(self, end_collection, prefix=".", holding_collection=None):
+        """Generate list of add/modify/delete actions.
+
+        When given to `apply`, will change `self` to match `end_collection`
+
+        """
+        changes = []
+        if holding_collection is None:
+            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
+        for k in self:
+            if k not in end_collection:
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
+        for k in end_collection:
+            if k in self:
+                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
+                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
+                elif end_collection[k] != self[k]:
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+            else:
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
+        return changes
+
+    @must_be_writable
+    @synchronized
+    def apply(self, changes):
+        """Apply changes from `diff`.
+
+        If a change conflicts with a local change, it will be saved to an
+        alternate path indicating the conflict.
+
+        """
+        if changes:
+            self.set_committed(False)
+        for change in changes:
+            event_type = change[0]
+            path = change[1]
+            initial = change[2]
+            local = self.find(path)
+            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
+                                                                    time.gmtime()))
+            if event_type == ADD:
+                if local is None:
+                    # No local file at path, safe to copy over new file
+                    self.copy(initial, path)
+                elif local is not None and local != initial:
+                    # There is already local file and it is different:
+                    # save change to conflict file.
+                    self.copy(initial, conflictpath)
+            elif event_type == MOD or event_type == TOK:
+                final = change[3]
+                if local == initial:
+                    # Local matches the "initial" item so it has not
+                    # changed locally and is safe to update.
+                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
+                        # Replace contents of local file with new contents
+                        local.replace_contents(final)
+                    else:
+                        # Overwrite path with new item; this can happen if
+                        # path was a file and is now a collection or vice versa
+                        self.copy(final, path, overwrite=True)
+                else:
+                    # Local is missing (presumably deleted) or local doesn't
+                    # match the "start" value, so save change to conflict file
+                    self.copy(final, conflictpath)
+            elif event_type == DEL:
+                if local == initial:
+                    # Local item matches "initial" value, so it is safe to remove.
+                    self.remove(path, recursive=True)
+                # else, the file is modified or already removed, in either
+                # case we don't want to try to remove it.
+
+    def portable_data_hash(self):
+        """Get the portable data hash for this collection's manifest."""
+        if self._manifest_locator and self.committed():
+            # If the collection is already saved on the API server, and it's committed
+            # then return API server's PDH response.
+            return self._portable_data_hash
+        else:
+            stripped = self.portable_manifest_text()
+            return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
+
+    @synchronized
+    def __eq__(self, other):
+        if other is self:
+            return True
+        if not isinstance(other, RichCollectionBase):
+            return False
+        if len(self._items) != len(other):
+            return False
+        for k in self._items:
+            if k not in other:
+                return False
+            if self._items[k] != other[k]:
+                return False
+        return True
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    @synchronized
+    def flush(self):
+        """Flush bufferblocks to Keep."""
+        for e in self.values():
+            e.flush()
+
+
+class Collection(RichCollectionBase):
+    """Represents the root of an Arvados Collection.
+
+    This class is threadsafe.  The root collection object, all subcollections
+    and files are protected by a single lock (i.e. each access locks the entire
+    collection).
+
+    Brief summary of
+    useful methods:
+
+    :To read an existing file:
+      `c.open("myfile", "r")`
+
+    :To write a new file:
+      `c.open("myfile", "w")`
+
+    :To determine if a file exists:
+      `c.find("myfile") is not None`
+
+    :To copy a file:
+      `c.copy("source", "dest")`
+
+    :To delete a file:
+      `c.remove("myfile")`
+
+    :To save to an existing collection record:
+      `c.save()`
+
+    :To save a new collection record:
+    `c.save_new()`
+
+    :To merge remote changes into this object:
+      `c.update()`
+
+    Must be associated with an API server Collection record (during
+    initialization, or using `save_new`) to use `save` or `update`
+
+    """
+
+    def __init__(self, manifest_locator_or_text=None,
+                 api_client=None,
+                 keep_client=None,
+                 num_retries=None,
+                 parent=None,
+                 apiconfig=None,
+                 block_manager=None,
+                 replication_desired=None,
+                 put_threads=None):
+        """Collection constructor.
+
+        :manifest_locator_or_text:
+          One of Arvados collection UUID, block locator of
+          a manifest, raw manifest text, or None (to create an empty collection).
+        :parent:
+          the parent Collection, may be None.
+
+        :apiconfig:
+          A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
+          Prefer this over supplying your own api_client and keep_client (except in testing).
+          Will use default config settings if not specified.
+
+        :api_client:
+          The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
+        :keep_client:
+          the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
+        :num_retries:
+          the number of retries for API and Keep requests.
+
+        :block_manager:
+          the block manager to use.  If not specified, create one.
+
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
+        """
+        super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
+        self.put_threads = put_threads
 
 
-        self.num_retries = num_retries
+        if apiconfig:
+            self._config = apiconfig
+        else:
+            self._config = config.settings()
+
+        self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self._manifest_locator = None
         self._manifest_text = None
+        self._portable_data_hash = None
         self._api_response = None
         self._api_response = None
+        self._past_versions = set()
+
+        self.lock = threading.RLock()
+        self.events = None
 
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
 
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
@@ -664,31 +1230,79 @@ class Collection(CollectionBase):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
-                    "Argument to CollectionReader must be a manifest or a collection UUID")
+                    "Argument to CollectionReader is not a manifest or a collection UUID")
+
+            try:
+                self._populate()
+            except (IOError, errors.SyntaxError) as e:
+                raise errors.ArgumentError("Error processing manifest text: %s", e)
+
+    def root_collection(self):
+        return self
+
+    def stream_name(self):
+        return "."
+
+    def writable(self):
+        return True
+
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
+    @synchronized
+    @retry_method
+    def update(self, other=None, num_retries=None):
+        """Merge the latest collection on the API server with the current collection."""
+
+        if other is None:
+            if self._manifest_locator is None:
+                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+            other = CollectionReader(response["manifest_text"])
+        baseline = CollectionReader(self._manifest_text)
+        self.apply(baseline.diff(other))
+        self._manifest_text = self.manifest_text()
 
 
+    @synchronized
     def _my_api(self):
         if self._api_client is None:
     def _my_api(self):
         if self._api_client is None:
-            if self.parent is not None:
-                return self.parent._my_api()
-            self._api_client = arvados.api('v1')
-            self._keep_client = None  # Make a new one with the new api.
+            self._api_client = ThreadSafeApiCache(self._config)
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
         return self._api_client
 
+    @synchronized
     def _my_keep(self):
         if self._keep_client is None:
     def _my_keep(self):
         if self._keep_client is None:
-            if self.parent is not None:
-                return self.parent._my_keep()
-            self._keep_client = KeepClient(api_client=self._my_api(),
-                                           num_retries=self.num_retries)
+            if self._api_client is None:
+                self._my_api()
+            else:
+                self._keep_client = KeepClient(api_client=self._api_client)
         return self._keep_client
 
         return self._keep_client
 
+    @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     def _my_block_manager(self):
         if self._block_manager is None:
-            if self.parent is not None:
-                return self.parent._my_block_manager()
-            self._block_manager = BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
         return self._block_manager
 
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -698,10 +1312,15 @@ class Collection(CollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             self._manifest_text = self._api_response['manifest_text']
+            self._portable_data_hash = self._api_response['portable_data_hash']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -718,14 +1337,13 @@ class Collection(CollectionBase):
             return e
 
     def _populate(self):
             return e
 
     def _populate(self):
-        self._items = {}
         if self._manifest_locator is None and self._manifest_text is None:
             return
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
                            util.keep_locator_pattern.match(
         if self._manifest_locator is None and self._manifest_text is None:
             return
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
                            util.keep_locator_pattern.match(
-                self._manifest_locator))
+                               self._manifest_locator))
         if ((self._manifest_text is None) and
             util.signed_locator_pattern.match(self._manifest_locator)):
             error_via_keep = self._populate_from_keep()
         if ((self._manifest_text is None) and
             util.signed_locator_pattern.match(self._manifest_locator)):
             error_via_keep = self._populate_from_keep()
@@ -740,7 +1358,7 @@ class Collection(CollectionBase):
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
-            raise arvados.errors.NotFoundError(
+            raise errors.NotFoundError(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
@@ -748,303 +1366,374 @@ class Collection(CollectionBase):
                     error_via_api,
                     error_via_keep))
         # populate
                     error_via_api,
                     error_via_keep))
         # populate
-        import_manifest(self._manifest_text, self)
+        self._baseline_manifest = self._manifest_text
+        self._import_manifest(self._manifest_text)
 
 
-    def _populate_first(orig_func):
-        # Decorator for methods that read actual Collection data.
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            if self._items is None:
-                self._populate()
-            return orig_func(self, *args, **kwargs)
-        return wrapper
+
+    def _has_collection_uuid(self):
+        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        self.save(no_locator=True)
+        """Support scoped auto-commit in a with: block."""
+        if exc_type is None:
+            if self.writable() and self._has_collection_uuid():
+                self.save()
+        self.stop_threads()
+
+    def stop_threads(self):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
-    @_populate_first
-    def find(self, path, create=False, create_collection=False):
-        p = path.split("/")
-        if p[0] == '.':
-            del p[0]
-
-        if len(p) > 0:
-            item = self._items.get(p[0])
-            if len(p) == 1:
-                # item must be a file
-                if item is None and create:
-                    # create new file
-                    if create_collection:
-                        item = Collection(parent=self, num_retries=self.num_retries)
-                    else:
-                        item = ArvadosFile(self)
-                    self._items[p[0]] = item
-                return item
-            else:
-                if item is None and create:
-                    # create new collection
-                    item = Collection(parent=self, num_retries=self.num_retries)
-                    self._items[p[0]] = item
-                del p[0]
-                return item.find("/".join(p), create=create)
+    @synchronized
+    def manifest_locator(self):
+        """Get the manifest locator, if any.
+
+        The manifest locator will be set when the collection is loaded from an
+        API server record or the portable data hash of a manifest.
+
+        The manifest locator will be None if the collection is newly created or
+        was created directly from manifest text.  The method `save_new()` will
+        assign a manifest locator.
+
+        """
+        return self._manifest_locator
+
+    @synchronized
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
+        if new_config is None:
+            new_config = self._config
+        if readonly:
+            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
         else:
         else:
-            return self
+            newcollection = Collection(parent=new_parent, apiconfig=new_config)
+
+        newcollection._clonefrom(self)
+        return newcollection
 
 
-    @_populate_first
+    @synchronized
     def api_response(self):
     def api_response(self):
-        """api_response() -> dict or None
+        """Returns information about this Collection fetched from the API server.
 
 
-        Returns information about this Collection fetched from the API server.
         If the Collection exists in Keep but not the API server, currently
         returns None.  Future versions may provide a synthetic response.
         If the Collection exists in Keep but not the API server, currently
         returns None.  Future versions may provide a synthetic response.
+
         """
         return self._api_response
 
         """
         return self._api_response
 
-    def open(self, path, mode):
-        mode = mode.replace("b", "")
-        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
-            raise ArgumentError("Bad mode '%s'" % mode)
-        create = (mode != "r")
-
-        f = self.find(path, create=create)
-        if f is None:
-            raise IOError((errno.ENOENT, "File not found"))
-        if not isinstance(f, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+    def find_or_create(self, path, create_type):
+        """See `RichCollectionBase.find_or_create`"""
+        if path == ".":
+            return self
+        else:
+            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
 
-        if mode[0] == "w":
-            f.truncate(0)
+    def find(self, path):
+        """See `RichCollectionBase.find`"""
+        if path == ".":
+            return self
+        else:
+            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
 
-        if mode == "r":
-            return ArvadosFileReader(f, path, mode)
+    def remove(self, path, recursive=False):
+        """See `RichCollectionBase.remove`"""
+        if path == ".":
+            raise errors.ArgumentError("Cannot remove '.'")
         else:
         else:
-            return ArvadosFileWriter(f, path, mode)
+            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
 
 
-    @_populate_first
-    def modified(self):
-        for k,v in self._items.items():
-            if v.modified():
-                return True
-        return False
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save(self, merge=True, num_retries=None):
+        """Save collection to an existing collection record.
 
 
-    @_populate_first
-    def set_unmodified(self):
-        for k,v in self._items.items():
-            v.set_unmodified()
+        Commit pending buffer blocks to Keep, merge with remote record (if
+        merge=True, the default), and update the collection record.  Returns
+        the current manifest text.
 
 
-    @_populate_first
-    def __iter__(self):
-        return self._items.iterkeys()
+        Will raise AssertionError if not associated with a collection record on
+        the API server.  If you want to save a manifest to Keep only, see
+        `save_new()`.
 
 
-    @_populate_first
-    def iterkeys(self):
-        return self._items.iterkeys()
+        :merge:
+          Update and merge remote changes before saving.  Otherwise, any
+          remote changes will be ignored and overwritten.
 
 
-    @_populate_first
-    def __getitem__(self, k):
-        return self._items[k]
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
 
 
-    @_populate_first
-    def __contains__(self, k):
-        return k in self._items
+        """
+        if not self.committed():
+            if not self._has_collection_uuid():
+                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
 
 
-    @_populate_first
-    def __len__(self):
-       return len(self._items)
+            self._my_block_manager().commit_all()
 
 
-    @_populate_first
-    def __delitem__(self, p):
-        del self._items[p]
+            if merge:
+                self.update()
 
 
-    @_populate_first
-    def keys(self):
-        return self._items.keys()
+            text = self.manifest_text(strip=False)
+            self._remember_api_response(self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body={'manifest_text': text}
+                ).execute(
+                    num_retries=num_retries))
+            self._manifest_text = self._api_response["manifest_text"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+            self.set_committed(True)
 
 
-    @_populate_first
-    def values(self):
-        return self._items.values()
+        return self._manifest_text
 
 
-    @_populate_first
-    def items(self):
-        return self._items.items()
 
 
-    @_populate_first
-    def exists(self, path):
-        return self.find(path) != None
-
-    @_populate_first
-    def remove(self, path):
-        p = path.split("/")
-        if p[0] == '.':
-            del p[0]
-
-        if len(p) > 0:
-            item = self._items.get(p[0])
-            if item is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            if len(p) == 1:
-                del self._items[p[0]]
-            else:
-                del p[0]
-                item.remove("/".join(p))
-        else:
-            raise IOError((errno.ENOENT, "File not found"))
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save_new(self, name=None,
+                 create_collection_record=True,
+                 owner_uuid=None,
+                 ensure_unique_name=False,
+                 num_retries=None):
+        """Save collection to a new collection record.
 
 
-    @_populate_first
-    def manifest_text(self, strip=False, normalize=False):
-        if self.modified() or self._manifest_text is None or normalize:
-            return export_manifest(self, stream_name=".", portable_locators=strip)
-        else:
-            if strip:
-                return self.stripped_manifest()
-            else:
-                return self._manifest_text
+        Commit pending buffer blocks to Keep and, when create_collection_record
+        is True (default), create a new collection record.  After creating a
+        new collection record, this Collection object will be associated with
+        the new record used by `save()`.  Returns the current manifest text.
 
 
-    def portable_data_hash(self):
-        stripped = self.manifest_text(strip=True)
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        :name:
+          The collection name.
 
 
-    @_populate_first
-    def save(self, no_locator=False):
-        if self.modified():
-            self._my_block_manager().commit_all()
-            self._my_keep().put(self.manifest_text(strip=True))
-            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
-                self._api_response = self._my_api().collections().update(
-                    uuid=self._manifest_locator,
-                    body={'manifest_text': self.manifest_text(strip=False)}
-                    ).execute(
-                        num_retries=self.num_retries)
-            elif not no_locator:
-                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
-            self.set_unmodified()
-
-    @_populate_first
-    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+        :create_collection_record:
+           If True, create a collection record on the API server.
+           If False, only commit blocks to Keep and return the manifest text.
+
+        :owner_uuid:
+          the user, or project uuid that will own this collection.
+          If None, defaults to the current user.
+
+        :ensure_unique_name:
+          If True, ask the API server to rename the collection
+          if it conflicts with a collection with the same name and owner.  If
+          False, a name conflict will result in an error.
+
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
+        """
         self._my_block_manager().commit_all()
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True))
-        body = {"manifest_text": self.manifest_text(strip=False),
-                "name": name}
-        if owner_uuid:
-            body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
-        self._manifest_locator = self._api_response["uuid"]
-        self.set_unmodified()
-
-    @_populate_first
-    def rename(self, old, new):
-        old_path, old_fn = os.path.split(old)
-        old_col = self.find(path)
-        if old_col is None:
-            raise IOError((errno.ENOENT, "File not found"))
-        if not isinstance(old_p, Collection):
-            raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory"))
-        if old_fn in old_col:
-            new_path, new_fn = os.path.split(new)
-            new_col = self.find(new_path, create=True, create_collection=True)
-            if not isinstance(new_col, Collection):
-                raise IOError((errno.ENOTDIR, "Destination is a file, not a directory"))
-            ent = old_col[old_fn]
-            del old_col[old_fn]
-            ent.parent = new_col
-            new_col[new_fn] = ent
-        else:
-            raise IOError((errno.ENOENT, "File not found"))
+        text = self.manifest_text(strip=False)
+
+        if create_collection_record:
+            if name is None:
+                name = "New collection"
+                ensure_unique_name = True
+
+            body = {"manifest_text": text,
+                    "name": name,
+                    "replication_desired": self.replication_desired}
+            if owner_uuid:
+                body["owner_uuid"] = owner_uuid
+
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
+            text = self._api_response["manifest_text"]
+
+            self._manifest_locator = self._api_response["uuid"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+
+            self._manifest_text = text
+            self.set_committed(True)
 
 
-def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
-    if into_collection is not None:
-        if len(into_collection) > 0:
+        return text
+
+    @synchronized
+    def _import_manifest(self, manifest_text):
+        """Import a manifest into a `Collection`.
+
+        :manifest_text:
+          The manifest text to import from.
+
+        """
+        if len(self) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
             raise ArgumentError("Can only import manifest into an empty collection")
-        c = into_collection
-    else:
-        c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
-
-    STREAM_NAME = 0
-    BLOCKS = 1
-    SEGMENTS = 2
-
-    stream_name = None
-    state = STREAM_NAME
-
-    for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
-        tok = n.group(1)
-        sep = n.group(2)
-
-        if state == STREAM_NAME:
-            # starting a new stream
-            stream_name = tok.replace('\\040', ' ')
-            blocks = []
-            segments = []
-            streamoffset = 0L
-            state = BLOCKS
-            continue
-
-        if state == BLOCKS:
-            s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
-            if s:
-                blocksize = long(s.group(1))
-                blocks.append(Range(tok, streamoffset, blocksize))
-                streamoffset += blocksize
-            else:
-                state = SEGMENTS
-
-        if state == SEGMENTS:
-            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
-            if s:
-                pos = long(s.group(1))
-                size = long(s.group(2))
-                name = s.group(3).replace('\\040', ' ')
-                f = c.find("%s/%s" % (stream_name, name), create=True)
-                f.add_segment(blocks, pos, size)
-            else:
-                # error!
-                raise errors.SyntaxError("Invalid manifest format")
-
-        if sep == "\n":
-            stream_name = None
-            state = STREAM_NAME
-
-    c.set_unmodified()
-    return c
-
-def export_manifest(item, stream_name=".", portable_locators=False):
-    buf = ""
-    if isinstance(item, Collection):
-        stream = {}
-        sorted_keys = sorted(item.keys())
-        for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
-            v = item[k]
-            st = []
-            for s in v.segments:
-                loc = s.locator
-                if loc.startswith("bufferblock"):
-                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
-                if portable_locators:
-                    loc = KeepLocator(loc).stripped()
-                st.append(LocatorAndRange(loc, locator_block_size(loc),
-                                     s.segment_offset, s.range_size))
-            stream[k] = st
-        if stream:
-            buf += ' '.join(normalize_stream(stream_name, stream))
-            buf += "\n"
-        for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
-            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
-    elif isinstance(item, ArvadosFile):
-        st = []
-        for s in item.segments:
-            loc = s.locator
-            if loc.startswith("bufferblock"):
-                loc = item._bufferblocks[loc].calculate_locator()
-            if portable_locators:
-                loc = KeepLocator(loc).stripped()
-            st.append(LocatorAndRange(loc, locator_block_size(loc),
-                                 s.segment_offset, s.range_size))
-        stream[stream_name] = st
-        buf += ' '.join(normalize_stream(stream_name, stream))
-        buf += "\n"
-    return buf
+
+        STREAM_NAME = 0
+        BLOCKS = 1
+        SEGMENTS = 2
+
+        stream_name = None
+        state = STREAM_NAME
+
+        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+            tok = token_and_separator.group(1)
+            sep = token_and_separator.group(2)
+
+            if state == STREAM_NAME:
+                # starting a new stream
+                stream_name = tok.replace('\\040', ' ')
+                blocks = []
+                segments = []
+                streamoffset = 0L
+                state = BLOCKS
+                self.find_or_create(stream_name, COLLECTION)
+                continue
+
+            if state == BLOCKS:
+                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                if block_locator:
+                    blocksize = long(block_locator.group(1))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
+                    streamoffset += blocksize
+                else:
+                    state = SEGMENTS
+
+            if state == SEGMENTS:
+                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                if file_segment:
+                    pos = long(file_segment.group(1))
+                    size = long(file_segment.group(2))
+                    name = file_segment.group(3).replace('\\040', ' ')
+                    filepath = os.path.join(stream_name, name)
+                    afile = self.find_or_create(filepath, FILE)
+                    if isinstance(afile, ArvadosFile):
+                        afile.add_segment(blocks, pos, size)
+                    else:
+                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                else:
+                    # error!
+                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
+
+            if sep == "\n":
+                stream_name = None
+                state = STREAM_NAME
+
+        self.set_committed(True)
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+
+
+class Subcollection(RichCollectionBase):
+    """This is a subdirectory within a collection that doesn't have its own API
+    server record.
+
+    Subcollection locking falls under the umbrella lock of its root collection.
+
+    """
+
+    def __init__(self, parent, name):
+        super(Subcollection, self).__init__(parent)
+        self.lock = self.root_collection().lock
+        self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
+
+    def root_collection(self):
+        return self.parent.root_collection()
+
+    def writable(self):
+        return self.root_collection().writable()
+
+    def _my_api(self):
+        return self.root_collection()._my_api()
+
+    def _my_keep(self):
+        return self.root_collection()._my_keep()
+
+    def _my_block_manager(self):
+        return self.root_collection()._my_block_manager()
+
+    def stream_name(self):
+        return os.path.join(self.parent.stream_name(), self.name)
+
+    @synchronized
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
+        c._clonefrom(self)
+        return c
+
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self.set_committed(False)
+        self.flush()
+        self.parent.remove(self.name, recursive=True)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
+
+class CollectionReader(Collection):
+    """A read-only collection object.
+
+    Initialize from an api collection record locator, a portable data hash of a
+    manifest, or raw manifest text.  See `Collection` constructor for detailed
+    options.
+
+    """
+    def __init__(self, manifest_locator_or_text, *args, **kwargs):
+        self._in_init = True
+        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
+        self._in_init = False
+
+        # Forego any locking since it should never change once initialized.
+        self.lock = NoopLock()
+
+        # Backwards compatability with old CollectionReader
+        # all_streams() and all_files()
+        self._streams = None
+
+    def writable(self):
+        return self._in_init
+
+    def _populate_streams(orig_func):
+        @functools.wraps(orig_func)
+        def populate_streams_wrapper(self, *args, **kwargs):
+            # Defer populating self._streams until needed since it creates a copy of the manifest.
+            if self._streams is None:
+                if self._manifest_text:
+                    self._streams = [sline.split()
+                                     for sline in self._manifest_text.split("\n")
+                                     if sline]
+                else:
+                    self._streams = []
+            return orig_func(self, *args, **kwargs)
+        return populate_streams_wrapper
+
+    @_populate_streams
+    def normalize(self):
+        """Normalize the streams returned by `all_streams`.
+
+        This method is kept for backwards compatability and only affects the
+        behavior of `all_streams()` and `all_files()`
+
+        """
+
+        # Rearrange streams
+        streams = {}
+        for s in self.all_streams():
+            for f in s.all_files():
+                streamname, filename = split(s.name() + "/" + f.name())
+                if streamname not in streams:
+                    streams[streamname] = {}
+                if filename not in streams[streamname]:
+                    streams[streamname][filename] = []
+                for r in f.segments:
+                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
+
+        self._streams = [normalize_stream(s, streams[s])
+                         for s in sorted(streams)]
+    @_populate_streams
+    def all_streams(self):
+        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+                for s in self._streams]
+
+    @_populate_streams
+    def all_files(self):
+        for s in self.all_streams():
+            for f in s.all_files():
+                yield f