Merge branch '7832-pysdk-use-slots' refs #7832
[arvados.git] / sdk / python / arvados / collection.py
index 3d48652dd53afe4eecc3bc35628646e427a8ac73..38e794c24a217ffa5c76c1a7b026e4d432d369a0 100644 (file)
@@ -487,7 +487,8 @@ class RichCollectionBase(CollectionBase):
 
     def __init__(self, parent=None):
         self.parent = parent
 
     def __init__(self, parent=None):
         self.parent = parent
-        self._modified = True
+        self._committed = False
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
         self._items = {}
 
     def _my_api(self):
@@ -537,24 +538,24 @@ class RichCollectionBase(CollectionBase):
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self._committed = False
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self._committed = False
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                    raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -567,7 +568,7 @@ class RichCollectionBase(CollectionBase):
 
         """
         if not path:
 
         """
         if not path:
-            raise errors.ArgumentError("Parameter 'path' must not be empty.")
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
@@ -580,15 +581,20 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
                 else:
                     return item
             else:
-                raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
 
 
-    def mkdirs(path):
+    @synchronized
+    def mkdirs(self, path):
         """Recursive subcollection create.
 
         """Recursive subcollection create.
 
-        Like `os.mkdirs()`.  Will create intermediate subcollections needed to
-        contain the leaf subcollection path.
+        Like `os.makedirs()`.  Will create intermediate subcollections needed
+        to contain the leaf subcollection path.
 
         """
 
         """
+
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
+
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
@@ -615,7 +621,7 @@ class RichCollectionBase(CollectionBase):
         create = (mode != "r")
 
         if create and not self.writable():
         create = (mode != "r")
 
         if create and not self.writable():
-            raise IOError((errno.EROFS, "Collection is read only"))
+            raise IOError(errno.EROFS, "Collection is read only")
 
         if create:
             arvfile = self.find_or_create(path, FILE)
 
         if create:
             arvfile = self.find_or_create(path, FILE)
@@ -623,9 +629,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+            raise IOError(errno.EISDIR, "Is a directory: %s" % path)
 
         if mode[0] == "w":
             arvfile.truncate(0)
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -633,26 +639,31 @@ class RichCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, num_retries=self.num_retries)
         else:
         else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
 
-    @synchronized
     def modified(self):
     def modified(self):
-        """Test if the collection (or any subcollection or file) has been modified."""
-        if self._modified:
-            return True
-        for k,v in self._items.items():
-            if v.modified():
-                return True
-        return False
+        """Determine if the collection has been modified since last commited."""
+        return not self.committed()
+
+    @synchronized
+    def committed(self):
+        """Determine if the collection has been committed to the API server."""
+
+        if self._committed is False:
+            return False
+        for v in self._items.values():
+            if v.committed() is False:
+                return False
+        return True
 
     @synchronized
 
     @synchronized
-    def set_unmodified(self):
-        """Recursively clear modified flag."""
-        self._modified = False
+    def set_committed(self):
+        """Recursively set committed flag to True."""
+        self._committed = True
         for k,v in self._items.items():
         for k,v in self._items.items():
-            v.set_unmodified()
+            v.set_committed()
 
     @synchronized
     def __iter__(self):
 
     @synchronized
     def __iter__(self):
@@ -683,7 +694,7 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._modified = True
+        self._committed = False
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -715,33 +726,33 @@ class RichCollectionBase(CollectionBase):
         """
 
         if not path:
         """
 
         if not path:
-            raise errors.ArgumentError("Parameter 'path' must not be empty.")
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._modified = True
+            self._committed = False
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
         for k,v in source.items():
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
         for k,v in source.items():
-            self._items[k] = v.clone(self)
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False):
-        """Copy a file or subcollection to this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
         :source_obj:
           An ArvadosFile, or Subcollection object
 
         :source_obj:
           An ArvadosFile, or Subcollection object
@@ -753,24 +764,74 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
 
         :overwrite:
           Whether to overwrite target file if it already exists.
 
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
 
         if target_name in self and not overwrite:
         """
 
         if target_name in self and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+            raise IOError(errno.EEXIST, "File already exists")
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
-        # Actually make the copy.
-        dup = source_obj.clone(self)
-        self._items[target_name] = dup
-        self._modified = True
+        # Actually make the move or copy.
+        if reparent:
+            source_obj._reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
+        self._committed = False
 
         if modified_from:
 
         if modified_from:
-            self.notify(MOD, self, target_name, (modified_from, dup))
+            self.notify(MOD, self, target_name, (modified_from, item))
         else:
         else:
-            self.notify(ADD, self, target_name, dup)
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found")
+            sourcecomponents = source.split("/")
+        else:
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        else:
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found.")
+
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -792,42 +853,79 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
-        if source_collection is None:
-            source_collection = self
 
 
-        # Find the object to copy
-        if isinstance(source, basestring):
-            source_obj = source_collection.find(source)
-            if source_obj is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            sourcecomponents = source.split("/")
-        else:
-            source_obj = source
-            sourcecomponents = None
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
 
 
-        # Find parent collection the target path
-        targetcomponents = target_path.split("/")
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
 
 
-        # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        :source:
+          A string with a path to source file or subcollection.
 
 
-        if not target_name:
-            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
 
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
 
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
-            target_dir = target_dir[target_name]
-            target_name = sourcecomponents[-1]
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
+
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection is read only.")
+        target_dir.add(source_obj, target_name, overwrite, True)
 
 
-        target_dir.add(source_obj, target_name, overwrite)
+    def portable_manifest_text(self, stream_name="."):
+        """Get the manifest text for this collection, sub collections and files.
+
+        This method does not flush outstanding blocks to Keep.  It will return
+        a normalized manifest with access tokens stripped.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        """
+        return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
 
     @synchronized
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
+        This method will flush outstanding blocks to Keep.  By default, it will
+        not normalize an unmodified manifest or strip access tokens.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False (default), block locators are left unchanged.
+
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False (default) and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
+
+        """
+
+        self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize)
+
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize):
+        """Get the manifest text for this collection, sub collections and files.
+
         :stream_name:
         :stream_name:
-          Name of the stream (directory)
+          Name to use for this stream (directory)
 
         :strip:
           If True, remove signing tokens from block locators if present.
 
         :strip:
           If True, remove signing tokens from block locators if present.
@@ -841,7 +939,7 @@ class RichCollectionBase(CollectionBase):
 
         """
 
 
         """
 
-        if self.modified() or self._manifest_text is None or normalize:
+        if not self.committed() or self._manifest_text is None or normalize:
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
@@ -861,7 +959,7 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -881,15 +979,15 @@ class RichCollectionBase(CollectionBase):
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
         return changes
 
     @must_be_writable
@@ -901,12 +999,14 @@ class RichCollectionBase(CollectionBase):
         alternate path indicating the conflict.
 
         """
         alternate path indicating the conflict.
 
         """
+        if changes:
+            self._committed = False
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
-            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
@@ -941,9 +1041,27 @@ class RichCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.manifest_text(strip=True)
+        stripped = self.portable_manifest_text()
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
+
     @synchronized
     def __eq__(self, other):
         if other is self:
     @synchronized
     def __eq__(self, other):
         if other is self:
@@ -962,6 +1080,12 @@ class RichCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def flush(self):
+        """Flush bufferblocks to Keep."""
+        for e in self.values():
+            e.flush()
+
 
 class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
 
 class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
@@ -1044,9 +1168,9 @@ class Collection(RichCollectionBase):
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
 
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
         self.events = None
 
         if manifest_locator_or_text:
@@ -1058,7 +1182,7 @@ class Collection(RichCollectionBase):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
-                    "Argument to CollectionReader must be a manifest or a collection UUID")
+                    "Argument to CollectionReader is not a manifest or a collection UUID")
 
             try:
                 self._populate()
 
             try:
                 self._populate()
@@ -1074,6 +1198,10 @@ class Collection(RichCollectionBase):
     def writable(self):
         return True
 
     def writable(self):
         return True
 
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1083,9 +1211,15 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
+                # We've merged this record this before.  Don't do anything.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
+        self._manifest_text = self.manifest_text()
 
     @synchronized
     def _my_api(self):
 
     @synchronized
     def _my_api(self):
@@ -1109,6 +1243,10 @@ class Collection(RichCollectionBase):
             self._block_manager = _BlockManager(self._my_keep())
         return self._block_manager
 
             self._block_manager = _BlockManager(self._my_keep())
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1118,9 +1256,9 @@ class Collection(RichCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             return None
         except Exception as e:
             self._manifest_text = self._api_response['manifest_text']
             return None
         except Exception as e:
@@ -1179,9 +1317,12 @@ class Collection(RichCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if exc_type is not None:
+        if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
             if self.writable() and self._has_collection_uuid():
                 self.save()
+        self.stop_threads()
+
+    def stop_threads(self):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
@@ -1200,7 +1341,7 @@ class Collection(RichCollectionBase):
         return self._manifest_locator
 
     @synchronized
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1249,8 +1390,8 @@ class Collection(RichCollectionBase):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
-        merge=True, the default), write the manifest to Keep, and update the
-        collection record.
+        merge=True, the default), and update the collection record.  Returns
+        the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
@@ -1264,40 +1405,48 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
           Retry count on API calls (if None,  use the collection default)
 
         """
-        if self.modified():
+        if not self.committed():
             if not self._has_collection_uuid():
             if not self._has_collection_uuid():
-                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
+                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
+
             self._my_block_manager().commit_all()
             self._my_block_manager().commit_all()
+
             if merge:
                 self.update()
             if merge:
                 self.update()
-            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 
             text = self.manifest_text(strip=False)
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_unmodified()
+            self.set_committed()
+
+        return self._manifest_text
 
 
     @must_be_writable
     @synchronized
     @retry_method
 
 
     @must_be_writable
     @synchronized
     @retry_method
-    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+    def save_new(self, name=None,
+                 create_collection_record=True,
+                 owner_uuid=None,
+                 ensure_unique_name=False,
+                 num_retries=None):
         """Save collection to a new collection record.
 
         """Save collection to a new collection record.
 
-        Commit pending buffer blocks to Keep, write the manifest to Keep, and
-        create a new collection record (if create_collection_record True).
-        After creating a new collection record, this Collection object will be
-        associated with the new record used by `save()`.
+        Commit pending buffer blocks to Keep and, when create_collection_record
+        is True (default), create a new collection record.  After creating a
+        new collection record, this Collection object will be associated with
+        the new record used by `save()`.  Returns the current manifest text.
 
         :name:
           The collection name.
 
         :create_collection_record:
 
         :name:
           The collection name.
 
         :create_collection_record:
-          If True, create a collection record.  If False, only save the manifest to keep.
+           If True, create a collection record on the API server.
+           If False, only commit blocks to Keep and return the manifest text.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
@@ -1313,38 +1462,27 @@ class Collection(RichCollectionBase):
 
         """
         self._my_block_manager().commit_all()
 
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
                     "name": name}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
 
             body = {"manifest_text": text,
                     "name": name}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
 
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
 
-        self._manifest_text = text
-        self.set_unmodified()
-
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
+            self._manifest_text = text
+            self.set_committed()
 
 
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
+        return text
 
     @synchronized
     def _import_manifest(self, manifest_text):
 
     @synchronized
     def _import_manifest(self, manifest_text):
@@ -1375,13 +1513,14 @@ class Collection(RichCollectionBase):
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
+                self.find_or_create(stream_name, COLLECTION)
                 continue
 
             if state == BLOCKS:
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
                 continue
 
             if state == BLOCKS:
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
-                    blocks.append(Range(tok, streamoffset, blocksize))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
@@ -1400,27 +1539,34 @@ class Collection(RichCollectionBase):
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
-                    raise errors.SyntaxError("Invalid manifest format")
+                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_unmodified()
+        self.set_committed()
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
 
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
 
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
 
     def root_collection(self):
         return self.parent.root_collection()
 
     def root_collection(self):
         return self.parent.root_collection()
@@ -1437,21 +1583,25 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
 
         c._clonefrom(self)
         return c
 
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self._committed = False
+        self.flush()
+        self.parent.remove(self.name, recursive=True)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
 
 class CollectionReader(Collection):
     """A read-only collection object.
 
 class CollectionReader(Collection):
     """A read-only collection object.