Merge branch '11507-deleted-bufferblocks' refs #11507
[arvados.git] / sdk / python / arvados / collection.py
index 6d5dd4ff15c7e3b7aba62acb0ba1f4278d16f1c7..f26d3a3d27c0b221d269d3d4a1beb8775268f7e6 100644 (file)
@@ -3,14 +3,17 @@ import logging
 import os
 import re
 import errno
 import os
 import re
 import errno
+import hashlib
 import time
 import time
+import threading
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, NoopLock
-from keep import *
-from .stream import StreamReader, normalize_stream
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from keep import KeepLocator, KeepClient
+from .stream import StreamReader
+from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import config
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import config
@@ -35,7 +38,8 @@ class CollectionBase(object):
         return self._keep_client
 
     def stripped_manifest(self):
         return self._keep_client
 
     def stripped_manifest(self):
-        """
+        """Get the manifest with locator hints stripped.
+
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
@@ -303,7 +307,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -466,13 +471,15 @@ class ResumableCollectionWriter(CollectionWriter):
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
+
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+TOK = "tok"
 FILE = "file"
 COLLECTION = "collection"
 
 FILE = "file"
 COLLECTION = "collection"
 
-class SynchronizedCollectionBase(CollectionBase):
+class RichCollectionBase(CollectionBase):
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
@@ -482,7 +489,8 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def __init__(self, parent=None):
         self.parent = parent
 
     def __init__(self, parent=None):
         self.parent = parent
-        self._modified = True
+        self._committed = False
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
         self._items = {}
 
     def _my_api(self):
@@ -494,7 +502,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def _my_block_manager(self):
         raise NotImplementedError()
 
     def _my_block_manager(self):
         raise NotImplementedError()
 
-    def sync_mode(self):
+    def writable(self):
         raise NotImplementedError()
 
     def root_collection(self):
         raise NotImplementedError()
 
     def root_collection(self):
@@ -517,42 +525,39 @@ class SynchronizedCollectionBase(CollectionBase):
         the path.
 
         :create_type:
         the path.
 
         :create_type:
-          One of `arvado.collection.FILE` or
-          `arvado.collection.COLLECTION`.  If the path is not found, and value
+          One of `arvados.collection.FILE` or
+          `arvados.collection.COLLECTION`.  If the path is not found, and value
           of create_type is FILE then create and return a new ArvadosFile for
           the last path component.  If COLLECTION, then create and return a new
           Collection for the last path component.
 
         """
 
           of create_type is FILE then create and return a new ArvadosFile for
           the last path component.  If COLLECTION, then create and return a new
           Collection for the last path component.
 
         """
 
-        pathcomponents = path.split("/")
-
-        if pathcomponents and pathcomponents[0]:
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0]:
             item = self._items.get(pathcomponents[0])
             if len(pathcomponents) == 1:
             item = self._items.get(pathcomponents[0])
             if len(pathcomponents) == 1:
-                # item must be a file
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                     self.notify(ADD, self, pathcomponents[0], item)
-                del pathcomponents[0]
-                if isinstance(item, SynchronizedCollectionBase):
-                    return item.find_or_create("/".join(pathcomponents), create_type)
+                if isinstance(item, RichCollectionBase):
+                    return item.find_or_create(pathcomponents[1], create_type)
                 else:
                 else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -560,33 +565,45 @@ class SynchronizedCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
 
         """
-        pathcomponents = path.split("/")
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
 
-        if pathcomponents and pathcomponents[0]:
-            item = self._items.get(pathcomponents[0])
-            if len(pathcomponents) == 1:
-                # item must be a file
-                return item
-            else:
-                del pathcomponents[0]
-                if isinstance(item, SynchronizedCollectionBase):
-                    return item.find("/".join(pathcomponents))
-                else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
+        item = self._items.get(pathcomponents[0])
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
+            return item
         else:
         else:
-            return self
+            if isinstance(item, RichCollectionBase):
+                if pathcomponents[1]:
+                    return item.find(pathcomponents[1])
+                else:
+                    return item
+            else:
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
 
-    def mkdirs(path):
+    @synchronized
+    def mkdirs(self, path):
         """Recursive subcollection create.
 
         """Recursive subcollection create.
 
-        Like `os.mkdirs()`.  Will create intermediate subcollections needed to
-        contain the leaf subcollection path.
+        Like `os.makedirs()`.  Will create intermediate subcollections needed
+        to contain the leaf subcollection path.
 
         """
 
         """
+
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
+
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
@@ -612,8 +629,8 @@ class SynchronizedCollectionBase(CollectionBase):
             raise errors.ArgumentError("Bad mode '%s'" % mode)
         create = (mode != "r")
 
             raise errors.ArgumentError("Bad mode '%s'" % mode)
         create = (mode != "r")
 
-        if create and self.sync_mode() == SYNC_READONLY:
-            raise IOError((errno.EROFS, "Collection is read only"))
+        if create and not self.writable():
+            raise IOError(errno.EROFS, "Collection is read only")
 
         if create:
             arvfile = self.find_or_create(path, FILE)
 
         if create:
             arvfile = self.find_or_create(path, FILE)
@@ -621,9 +638,9 @@ class SynchronizedCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
         if mode[0] == "w":
             arvfile.truncate(0)
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -631,49 +648,55 @@ class SynchronizedCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, num_retries=self.num_retries)
         else:
         else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
 
-    @synchronized
     def modified(self):
     def modified(self):
-        """Test if the collection (or any subcollection or file) has been modified
-        since it was created."""
-        if self._modified:
-            return True
-        for k,v in self._items.items():
-            if v.modified():
-                return True
-        return False
+        """Determine if the collection has been modified since last commited."""
+        return not self.committed()
 
     @synchronized
 
     @synchronized
-    def set_unmodified(self):
-        """Recursively clear modified flag."""
-        self._modified = False
-        for k,v in self._items.items():
-            v.set_unmodified()
+    def committed(self):
+        """Determine if the collection has been committed to the API server."""
+        return self._committed
+
+    @synchronized
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in self._items.items():
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
         return iter(self._items.keys())
 
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
         return iter(self._items.keys())
 
-    @synchronized
-    def iterkeys(self):
-        """Iterate over names of files and collections directly contained in this collection."""
-        return self._items.keys()
-
     @synchronized
     def __getitem__(self, k):
     @synchronized
     def __getitem__(self, k):
-        """Get a file or collection that is directly contained by this collection.  If
-        you want to search a path, use `find()` instead.
+        """Get a file or collection that is directly contained by this collection.
+
+        If you want to search a path, use `find()` instead.
+
         """
         return self._items[k]
 
     @synchronized
     def __contains__(self, k):
         """
         return self._items[k]
 
     @synchronized
     def __contains__(self, k):
-        """If there is a file or collection a directly contained by this collection
-        with name `k`."""
+        """Test if there is a file or collection a directly contained by this collection."""
         return k in self._items
 
     @synchronized
         return k in self._items
 
     @synchronized
@@ -686,7 +709,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._modified = True
+        self.set_committed(False)
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -706,7 +729,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
-        return self.find(path) != None
+        return self.find(path) is not None
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -716,60 +739,84 @@ class SynchronizedCollectionBase(CollectionBase):
         :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
         :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
-        pathcomponents = path.split("/")
 
 
-        if len(pathcomponents) > 0:
-            item = self._items.get(pathcomponents[0])
-            if item is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            if len(pathcomponents) == 1:
-                if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                    raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
-                deleteditem = self._items[pathcomponents[0]]
-                del self._items[pathcomponents[0]]
-                self._modified = True
-                self.notify(DEL, self, pathcomponents[0], deleteditem)
-            else:
-                del pathcomponents[0]
-                item.remove("/".join(pathcomponents))
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' is empty.")
+
+        pathcomponents = path.split("/", 1)
+        item = self._items.get(pathcomponents[0])
+        if item is None:
+            raise IOError(errno.ENOENT, "File not found", path)
+        if len(pathcomponents) == 1:
+            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
+            deleteditem = self._items[pathcomponents[0]]
+            del self._items[pathcomponents[0]]
+            self.set_committed(False)
+            self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
         else:
-            raise IOError((errno.ENOENT, "File not found"))
+            item.remove(pathcomponents[1])
 
 
-    def _cloneinto(self, target):
-        for k,v in self._items.items():
-            target._items[k] = v.clone(target)
+    def _clonefrom(self, source):
+        for k,v in source.items():
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
-    def copy(self, source, target_path, source_collection=None, overwrite=False):
-        """Copy a file or subcollection to a new path in this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
 
-        :source:
-          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
+        :source_obj:
+          An ArvadosFile, or Subcollection object
 
 
-        :target_path:
-          Destination file or path.  If the target path already exists and is a
-          subcollection, the item will be placed inside the subcollection.  If
-          the target path already exists and is a file, this will raise an error
-          unless you specify `overwrite=True`.
-
-        :source_collection:
-          Collection to copy `source_path` from (default `self`)
+        :target_name:
+          Destination item name.  If the target name already exists and is a
+          file, this will raise an error unless you specify `overwrite=True`.
 
         :overwrite:
           Whether to overwrite target file if it already exists.
 
         :overwrite:
           Whether to overwrite target file if it already exists.
+
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
         """
+
+        if target_name in self and not overwrite:
+            raise IOError(errno.EEXIST, "File already exists", target_name)
+
+        modified_from = None
+        if target_name in self:
+            modified_from = self[target_name]
+
+        # Actually make the move or copy.
+        if reparent:
+            source_obj._reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
+        self.set_committed(False)
+
+        if modified_from:
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
         if source_collection is None:
             source_collection = self
 
         if source_collection is None:
             source_collection = self
 
-        # Find the object to copy
+        # Find the object
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
-                raise IOError((errno.ENOENT, "File not found"))
+                raise IOError(errno.ENOENT, "File not found", source)
             sourcecomponents = source.split("/")
         else:
             source_obj = source
             sourcecomponents = source.split("/")
         else:
             source_obj = source
@@ -779,41 +826,127 @@ class SynchronizedCollectionBase(CollectionBase):
         targetcomponents = target_path.split("/")
 
         # Determine the name to use.
         targetcomponents = target_path.split("/")
 
         # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 
         if not target_name:
             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 
 
         if not target_name:
             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        else:
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
 
 
-        with target_dir.lock:
-            if target_name in target_dir:
-                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
-                    target_dir = target_dir[target_name]
-                    target_name = sourcecomponents[-1]
-                elif not overwrite:
-                    raise IOError((errno.EEXIST, "File already exists"))
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
 
-            modified_from = None
-            if target_name in target_dir:
-                modified_from = target_dir[target_name]
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
 
 
-            # Actually make the copy.
-            dup = source_obj.clone(target_dir)
-            target_dir._items[target_name] = dup
-            target_dir._modified = True
+        return (source_obj, target_dir, target_name)
 
 
-        if modified_from:
-            self.notify(MOD, target_dir, target_name, (modified_from, dup))
-        else:
-            self.notify(ADD, target_dir, target_name, dup)
+    @must_be_writable
+    @synchronized
+    def copy(self, source, target_path, source_collection=None, overwrite=False):
+        """Copy a file or subcollection to a new path in this collection.
 
 
+        :source:
+          A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
+
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
+
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
+
+    @must_be_writable
     @synchronized
     @synchronized
-    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
+
+        :source:
+          A string with a path to source file or subcollection.
+
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
+
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
+
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection is read only", source)
+        target_dir.add(source_obj, target_name, overwrite, True)
+
+    def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
 
         """Get the manifest text for this collection, sub collections and files.
 
+        This method does not flush outstanding blocks to Keep.  It will return
+        a normalized manifest with access tokens stripped.
+
         :stream_name:
         :stream_name:
-          Name of the stream (directory)
+          Name to use for this stream (directory)
+
+        """
+        return self._get_manifest_text(stream_name, True, True)
+
+    @synchronized
+    def manifest_text(self, stream_name=".", strip=False, normalize=False,
+                      only_committed=False):
+        """Get the manifest text for this collection, sub collections and files.
+
+        This method will flush outstanding blocks to Keep.  By default, it will
+        not normalize an unmodified manifest or strip access tokens.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False (default), block locators are left unchanged.
+
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False (default) and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
+
+        :only_committed:
+          If True, don't commit pending blocks.
+
+        """
+
+        if not only_committed:
+            self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize,
+                                       only_committed=only_committed)
+
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+        """Get the manifest text for this collection, sub collections and files.
+
+        :stream_name:
+          Name to use for this stream (directory)
 
         :strip:
           If True, remove signing tokens from block locators if present.
 
         :strip:
           If True, remove signing tokens from block locators if present.
@@ -825,20 +958,24 @@ class SynchronizedCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         """
 
-        if self.modified() or self._manifest_text is None or normalize:
-            item  = self
+        if not self.committed() or self._manifest_text is None or normalize:
             stream = {}
             stream = {}
-            buf = ""
-            sorted_keys = sorted(item.keys())
-            for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
+            buf = []
+            sorted_keys = sorted(self.keys())
+            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
                 # Create a stream per file `k`
                 # Create a stream per file `k`
-                arvfile = item[filename]
+                arvfile = self[filename]
                 filestream = []
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 filestream = []
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -846,11 +983,10 @@ class SynchronizedCollectionBase(CollectionBase):
                                          segment.segment_offset, segment.range_size))
                 stream[filename] = filestream
             if stream:
                                          segment.segment_offset, segment.range_size))
                 stream[filename] = filestream
             if stream:
-                buf += ' '.join(normalize_stream(stream_name, stream))
-                buf += "\n"
-            for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
-                buf += item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip)
-            return buf
+                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
+            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+            return "".join(buf)
         else:
             if strip:
                 return self.stripped_manifest()
         else:
             if strip:
                 return self.stripped_manifest()
@@ -859,24 +995,27 @@ class SynchronizedCollectionBase(CollectionBase):
 
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
 
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
-        """
-        Generate list of add/modify/delete actions which, when given to `apply`, will
-        change `self` to match `end_collection`
+        """Generate list of add/modify/delete actions.
+
+        When given to `apply`, will change `self` to match `end_collection`
+
         """
         changes = []
         if holding_collection is None:
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
         """
         changes = []
         if holding_collection is None:
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
         return changes
 
     @must_be_writable
@@ -888,12 +1027,14 @@ class SynchronizedCollectionBase(CollectionBase):
         alternate path indicating the conflict.
 
         """
         alternate path indicating the conflict.
 
         """
+        if changes:
+            self.set_committed(False)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
-            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
@@ -903,7 +1044,7 @@ class SynchronizedCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif event_type == MOD:
+            elif event_type == MOD or event_type == TOK:
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
@@ -928,14 +1069,37 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.manifest_text(strip=True)
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        if self._manifest_locator and self.committed():
+            # If the collection is already saved on the API server, and it's committed
+            # then return API server's PDH response.
+            return self._portable_data_hash
+        else:
+            stripped = self.portable_manifest_text()
+            return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
 
     @synchronized
     def __eq__(self, other):
         if other is self:
             return True
 
     @synchronized
     def __eq__(self, other):
         if other is self:
             return True
-        if not isinstance(other, SynchronizedCollectionBase):
+        if not isinstance(other, RichCollectionBase):
             return False
         if len(self._items) != len(other):
             return False
             return False
         if len(self._items) != len(other):
             return False
@@ -949,12 +1113,22 @@ class SynchronizedCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def flush(self):
+        """Flush bufferblocks to Keep."""
+        for e in self.values():
+            e.flush()
+
+
+class Collection(RichCollectionBase):
+    """Represents the root of an Arvados Collection.
 
 
-class Collection(SynchronizedCollectionBase):
-    """Represents the root of an Arvados Collection, which may be associated with
-    an API server Collection record.
+    This class is threadsafe.  The root collection object, all subcollections
+    and files are protected by a single lock (i.e. each access locks the entire
+    collection).
 
 
-    Brief summary of useful methods:
+    Brief summary of
+    useful methods:
 
     :To read an existing file:
       `c.open("myfile", "r")`
 
     :To read an existing file:
       `c.open("myfile", "r")`
@@ -980,9 +1154,8 @@ class Collection(SynchronizedCollectionBase):
     :To merge remote changes into this object:
       `c.update()`
 
     :To merge remote changes into this object:
       `c.update()`
 
-    This class is threadsafe.  The root collection object, all subcollections
-    and files are protected by a single lock (i.e. each access locks the entire
-    collection).
+    Must be associated with an API server Collection record (during
+    initialization, or using `save_new`) to use `save` or `update`
 
     """
 
 
     """
 
@@ -992,7 +1165,9 @@ class Collection(SynchronizedCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None,
+                 put_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1000,24 +1175,36 @@ class Collection(SynchronizedCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
+        self.put_threads = put_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1027,11 +1214,11 @@ class Collection(SynchronizedCollectionBase):
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
+        self._portable_data_hash = None
         self._api_response = None
         self._api_response = None
+        self._past_versions = set()
 
 
-        self._sync = SYNC_EXPLICIT
         self.lock = threading.RLock()
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
         self.events = None
 
         if manifest_locator_or_text:
@@ -1043,10 +1230,12 @@ class Collection(SynchronizedCollectionBase):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
-                    "Argument to CollectionReader must be a manifest or a collection UUID")
-
-            self._populate()
+                    "Argument to CollectionReader is not a manifest or a collection UUID")
 
 
+            try:
+                self._populate()
+            except (IOError, errors.SyntaxError) as e:
+                raise errors.ArgumentError("Error processing manifest text: %s", e)
 
     def root_collection(self):
         return self
 
     def root_collection(self):
         return self
@@ -1054,29 +1243,42 @@ class Collection(SynchronizedCollectionBase):
     def stream_name(self):
         return "."
 
     def stream_name(self):
         return "."
 
-    def sync_mode(self):
-        return self._sync
+    def writable(self):
+        return True
+
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
 
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
 
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
-        """Fetch the latest collection record on the API server and merge it with the
-        current collection contents.
+        """Merge the latest collection on the API server with the current collection."""
 
 
-        """
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
+        self._manifest_text = self.manifest_text()
 
     @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
 
     @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1091,9 +1293,16 @@ class Collection(SynchronizedCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
         return self._block_manager
 
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1103,10 +1312,15 @@ class Collection(SynchronizedCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             self._manifest_text = self._api_response['manifest_text']
+            self._portable_data_hash = self._api_response['portable_data_hash']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -1144,7 +1358,7 @@ class Collection(SynchronizedCollectionBase):
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
-            raise arvados.errors.NotFoundError(
+            raise errors.NotFoundError(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
@@ -1164,13 +1378,31 @@ class Collection(SynchronizedCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if self._sync != SYNC_READONLY and self._has_collection_uuid():
-            self.save()
+        if exc_type is None:
+            if self.writable() and self._has_collection_uuid():
+                self.save()
+        self.stop_threads()
+
+    def stop_threads(self):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @synchronized
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def manifest_locator(self):
+        """Get the manifest locator, if any.
+
+        The manifest locator will be set when the collection is loaded from an
+        API server record or the portable data hash of a manifest.
+
+        The manifest locator will be None if the collection is newly created or
+        was created directly from manifest text.  The method `save_new()` will
+        assign a manifest locator.
+
+        """
+        return self._manifest_locator
+
+    @synchronized
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1178,9 +1410,7 @@ class Collection(SynchronizedCollectionBase):
         else:
             newcollection = Collection(parent=new_parent, apiconfig=new_config)
 
         else:
             newcollection = Collection(parent=new_parent, apiconfig=new_config)
 
-        newcollection._sync = None
-        self._cloneinto(newcollection)
-        newcollection._sync = SYNC_READONLY if readonly else SYNC_EXPLICIT
+        newcollection._clonefrom(self)
         return newcollection
 
     @synchronized
         return newcollection
 
     @synchronized
@@ -1194,21 +1424,21 @@ class Collection(SynchronizedCollectionBase):
         return self._api_response
 
     def find_or_create(self, path, create_type):
         return self._api_response
 
     def find_or_create(self, path, create_type):
-        """See `SynchronizedCollectionBase.find_or_create`"""
+        """See `RichCollectionBase.find_or_create`"""
         if path == ".":
             return self
         else:
             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
     def find(self, path):
         if path == ".":
             return self
         else:
             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
     def find(self, path):
-        """See `SynchronizedCollectionBase.find`"""
+        """See `RichCollectionBase.find`"""
         if path == ".":
             return self
         else:
             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
     def remove(self, path, recursive=False):
         if path == ".":
             return self
         else:
             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
     def remove(self, path, recursive=False):
-        """See `SynchronizedCollectionBase.remove`"""
+        """See `RichCollectionBase.remove`"""
         if path == ".":
             raise errors.ArgumentError("Cannot remove '.'")
         else:
         if path == ".":
             raise errors.ArgumentError("Cannot remove '.'")
         else:
@@ -1218,52 +1448,67 @@ class Collection(SynchronizedCollectionBase):
     @synchronized
     @retry_method
     def save(self, merge=True, num_retries=None):
     @synchronized
     @retry_method
     def save(self, merge=True, num_retries=None):
-        """Commit pending buffer blocks to Keep, merge with remote record (if
-        update=True), write the manifest to Keep, and update the collection
-        record.
+        """Save collection to an existing collection record.
+
+        Commit pending buffer blocks to Keep, merge with remote record (if
+        merge=True, the default), and update the collection record.  Returns
+        the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
-        :update:
+        :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         """
-        if self.modified():
+        if not self.committed():
             if not self._has_collection_uuid():
             if not self._has_collection_uuid():
-                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
+                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
+
             self._my_block_manager().commit_all()
             self._my_block_manager().commit_all()
+
             if merge:
                 self.update()
             if merge:
                 self.update()
-            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 
             text = self.manifest_text(strip=False)
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_unmodified()
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+            self.set_committed(True)
+
+        return self._manifest_text
 
 
     @must_be_writable
     @synchronized
     @retry_method
 
 
     @must_be_writable
     @synchronized
     @retry_method
-    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
-        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
-        a new collection record (if create_collection_record True).
-
-        After creating a new collection record, this Collection object will be
-        associated with the new record used by `save()`.
+    def save_new(self, name=None,
+                 create_collection_record=True,
+                 owner_uuid=None,
+                 ensure_unique_name=False,
+                 num_retries=None):
+        """Save collection to a new collection record.
+
+        Commit pending buffer blocks to Keep and, when create_collection_record
+        is True (default), create a new collection record.  After creating a
+        new collection record, this Collection object will be associated with
+        the new record used by `save()`.  Returns the current manifest text.
 
         :name:
           The collection name.
 
 
         :name:
           The collection name.
 
-        :keep_only:
-          Only save the manifest to keep, do not create a collection record.
+        :create_collection_record:
+           If True, create a collection record on the API server.
+           If False, only commit blocks to Keep and return the manifest text.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
@@ -1274,40 +1519,34 @@ class Collection(SynchronizedCollectionBase):
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         self._my_block_manager().commit_all()
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
 
 
-        self._manifest_text = text
-        self.set_unmodified()
+            self._manifest_text = text
+            self.set_committed(True)
 
 
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
-
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
+        return text
 
     @synchronized
     def _import_manifest(self, manifest_text):
 
     @synchronized
     def _import_manifest(self, manifest_text):
@@ -1320,9 +1559,6 @@ class Collection(SynchronizedCollectionBase):
         if len(self) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
 
         if len(self) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
 
-        save_sync = self.sync_mode()
-        self._sync = None
-
         STREAM_NAME = 0
         BLOCKS = 1
         SEGMENTS = 2
         STREAM_NAME = 0
         BLOCKS = 1
         SEGMENTS = 2
@@ -1330,9 +1566,9 @@ class Collection(SynchronizedCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
         stream_name = None
         state = STREAM_NAME
 
-        for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
-            tok = n.group(1)
-            sep = n.group(2)
+        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+            tok = token_and_separator.group(1)
+            sep = token_and_separator.group(2)
 
             if state == STREAM_NAME:
                 # starting a new stream
 
             if state == STREAM_NAME:
                 # starting a new stream
@@ -1341,59 +1577,66 @@ class Collection(SynchronizedCollectionBase):
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
+                self.find_or_create(stream_name, COLLECTION)
                 continue
 
             if state == BLOCKS:
                 continue
 
             if state == BLOCKS:
-                s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
-                if s:
-                    blocksize = long(s.group(1))
-                    blocks.append(Range(tok, streamoffset, blocksize))
+                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                if block_locator:
+                    blocksize = long(block_locator.group(1))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
 
             if state == SEGMENTS:
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                s = re.search(r'^(\d+):(\d+):(\S+)', tok)
-                if s:
-                    pos = long(s.group(1))
-                    size = long(s.group(2))
-                    name = s.group(3).replace('\\040', ' ')
+                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                if file_segment:
+                    pos = long(file_segment.group(1))
+                    size = long(file_segment.group(2))
+                    name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     filepath = os.path.join(stream_name, name)
-                    f = self.find_or_create(filepath, FILE)
-                    if isinstance(f, ArvadosFile):
-                        f.add_segment(blocks, pos, size)
+                    afile = self.find_or_create(filepath, FILE)
+                    if isinstance(afile, ArvadosFile):
+                        afile.add_segment(blocks, pos, size)
                     else:
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                     else:
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
-                    raise errors.SyntaxError("Invalid manifest format")
+                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_unmodified()
-        self._sync = save_sync
+        self.set_committed(True)
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
 
 
 
 
-class Subcollection(SynchronizedCollectionBase):
+class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
 
     def root_collection(self):
         return self.parent.root_collection()
 
 
     def root_collection(self):
         return self.parent.root_collection()
 
-    def sync_mode(self):
-        return self.root_collection().sync_mode()
+    def writable(self):
+        return self.root_collection().writable()
 
     def _my_api(self):
         return self.root_collection()._my_api()
 
     def _my_api(self):
         return self.root_collection()._my_api()
@@ -1404,43 +1647,49 @@ class Subcollection(SynchronizedCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
-        self._cloneinto(c)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
+        c._clonefrom(self)
         return c
 
         return c
 
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self.set_committed(False)
+        self.flush()
+        self.parent.remove(self.name, recursive=True)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
 
 class CollectionReader(Collection):
 
 class CollectionReader(Collection):
-    """A read-only collection object from an api collection record locator,
-    a portable data hash of a manifest, or raw manifest text.
+    """A read-only collection object.
 
 
-    See `Collection` constructor for detailed options.
+    Initialize from an api collection record locator, a portable data hash of a
+    manifest, or raw manifest text.  See `Collection` constructor for detailed
+    options.
 
     """
 
     """
-    def __init__(self, *args, **kwargs):
-        if not args and not kwargs.get("manifest_locator_or_text"):
-            raise errors.ArgumentError("Must provide manifest locator or text to initialize ReadOnlyCollection")
-
-        super(CollectionReader, self).__init__(*args, **kwargs)
+    def __init__(self, manifest_locator_or_text, *args, **kwargs):
+        self._in_init = True
+        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
+        self._in_init = False
 
         # Forego any locking since it should never change once initialized.
         self.lock = NoopLock()
 
         # Forego any locking since it should never change once initialized.
         self.lock = NoopLock()
-        self._sync = SYNC_READONLY
 
         # Backwards compatability with old CollectionReader
         # all_streams() and all_files()
         self._streams = None
 
 
         # Backwards compatability with old CollectionReader
         # all_streams() and all_files()
         self._streams = None
 
+    def writable(self):
+        return self._in_init
+
     def _populate_streams(orig_func):
         @functools.wraps(orig_func)
         def populate_streams_wrapper(self, *args, **kwargs):
     def _populate_streams(orig_func):
         @functools.wraps(orig_func)
         def populate_streams_wrapper(self, *args, **kwargs):