8460: Use new ws server for integration tests only if ARVADOS_TEST_EXPERIMENTAL_WS...
[arvados.git] / sdk / python / arvados / collection.py
index cb090ec767b8744e3df900fba6ab20069e42e55c..27aad033ae55523de4fd14ae7bf9cf8a7d2b654f 100644 (file)
@@ -307,7 +307,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -474,6 +475,7 @@ class ResumableCollectionWriter(CollectionWriter):
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+TOK = "tok"
 FILE = "file"
 COLLECTION = "collection"
 
 FILE = "file"
 COLLECTION = "collection"
 
@@ -487,7 +489,7 @@ class RichCollectionBase(CollectionBase):
 
     def __init__(self, parent=None):
         self.parent = parent
 
     def __init__(self, parent=None):
         self.parent = parent
-        self._modified = True
+        self._committed = False
         self._callback = None
         self._items = {}
 
         self._callback = None
         self._items = {}
 
@@ -542,7 +544,7 @@ class RichCollectionBase(CollectionBase):
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self._committed = False
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
@@ -550,12 +552,12 @@ class RichCollectionBase(CollectionBase):
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self._committed = False
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -581,15 +583,20 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
                 else:
                     return item
             else:
-                raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
 
+    @synchronized
     def mkdirs(self, path):
         """Recursive subcollection create.
 
     def mkdirs(self, path):
         """Recursive subcollection create.
 
-        Like `os.mkdirs()`.  Will create intermediate subcollections needed to
-        contain the leaf subcollection path.
+        Like `os.makedirs()`.  Will create intermediate subcollections needed
+        to contain the leaf subcollection path.
 
         """
 
         """
+
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
+
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
@@ -624,9 +631,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError(errno.EISDIR, "Is a directory: %s" % path)
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
         if mode[0] == "w":
             arvfile.truncate(0)
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -634,26 +641,31 @@ class RichCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, num_retries=self.num_retries)
         else:
             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
         else:
             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
-    @synchronized
     def modified(self):
     def modified(self):
-        """Test if the collection (or any subcollection or file) has been modified."""
-        if self._modified:
-            return True
+        """Determine if the collection has been modified since last commited."""
+        return not self.committed()
+
+    @synchronized
+    def committed(self):
+        """Determine if the collection has been committed to the API server."""
+
+        if self._committed is False:
+            return False
         for v in self._items.values():
         for v in self._items.values():
-            if v.modified():
-                return True
-        return False
+            if v.committed() is False:
+                return False
+        return True
 
     @synchronized
 
     @synchronized
-    def set_unmodified(self):
-        """Recursively clear modified flag."""
-        self._modified = False
+    def set_committed(self):
+        """Recursively set committed flag to True."""
+        self._committed = True
         for k,v in self._items.items():
         for k,v in self._items.items():
-            v.set_unmodified()
+            v.set_committed()
 
     @synchronized
     def __iter__(self):
 
     @synchronized
     def __iter__(self):
@@ -684,7 +696,7 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._modified = True
+        self._committed = False
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -721,13 +733,13 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._modified = True
+            self._committed = False
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
@@ -762,7 +774,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if target_name in self and not overwrite:
         """
 
         if target_name in self and not overwrite:
-            raise IOError(errno.EEXIST, "File already exists")
+            raise IOError(errno.EEXIST, "File already exists", target_name)
 
         modified_from = None
         if target_name in self:
 
         modified_from = None
         if target_name in self:
@@ -776,7 +788,7 @@ class RichCollectionBase(CollectionBase):
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
-        self._modified = True
+        self._committed = False
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -791,7 +803,7 @@ class RichCollectionBase(CollectionBase):
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
-                raise IOError(errno.ENOENT, "File not found")
+                raise IOError(errno.ENOENT, "File not found", source)
             sourcecomponents = source.split("/")
         else:
             source_obj = source
             sourcecomponents = source.split("/")
         else:
             source_obj = source
@@ -815,7 +827,7 @@ class RichCollectionBase(CollectionBase):
                 target_dir = self
 
         if target_dir is None:
                 target_dir = self
 
         if target_dir is None:
-            raise IOError(errno.ENOENT, "Target directory not found.")
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
 
         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
@@ -870,7 +882,7 @@ class RichCollectionBase(CollectionBase):
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
-            raise IOError(errno.EROFS, "Source collection is read only.")
+            raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
@@ -885,6 +897,7 @@ class RichCollectionBase(CollectionBase):
         """
         return self._get_manifest_text(stream_name, True, True)
 
         """
         return self._get_manifest_text(stream_name, True, True)
 
+    @synchronized
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
@@ -910,7 +923,7 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -926,9 +939,12 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         """
 
-        if self.modified() or self._manifest_text is None or normalize:
+        if not self.committed() or self._manifest_text is None or normalize:
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
@@ -939,6 +955,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -975,6 +993,8 @@ class RichCollectionBase(CollectionBase):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
             else:
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
@@ -989,13 +1009,13 @@ class RichCollectionBase(CollectionBase):
 
         """
         if changes:
 
         """
         if changes:
-            self._modified = True
+            self._committed = False
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
-            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
@@ -1005,7 +1025,7 @@ class RichCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif event_type == MOD:
+            elif event_type == MOD or event_type == TOK:
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
@@ -1121,7 +1141,8 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1129,24 +1150,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1157,6 +1189,7 @@ class Collection(RichCollectionBase):
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
         self.events = None
 
         self.lock = threading.RLock()
         self.events = None
@@ -1186,6 +1219,10 @@ class Collection(RichCollectionBase):
     def writable(self):
         return True
 
     def writable(self):
         return True
 
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1195,6 +1232,15 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1204,7 +1250,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1219,9 +1266,16 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1231,10 +1285,14 @@ class Collection(RichCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -1295,6 +1353,9 @@ class Collection(RichCollectionBase):
         if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
         if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
+        self.stop_threads()
+
+    def stop_threads(self):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
@@ -1377,7 +1438,7 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
           Retry count on API calls (if None,  use the collection default)
 
         """
-        if self.modified():
+        if not self.committed():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
 
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
 
@@ -1387,13 +1448,13 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
                 self.update()
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_unmodified()
+            self.set_committed()
 
         return self._manifest_text
 
 
         return self._manifest_text
 
@@ -1438,21 +1499,22 @@ class Collection(RichCollectionBase):
 
         if create_collection_record:
             if name is None:
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
 
             self._manifest_text = text
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
 
             self._manifest_text = text
-            self.set_unmodified()
+            self.set_committed()
 
         return text
 
 
         return text
 
@@ -1485,7 +1547,7 @@ class Collection(RichCollectionBase):
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
-                self.mkdirs(stream_name)
+                self.find_or_create(stream_name, COLLECTION)
                 continue
 
             if state == BLOCKS:
                 continue
 
             if state == BLOCKS:
@@ -1511,13 +1573,13 @@ class Collection(RichCollectionBase):
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
-                    raise errors.SyntaxError("Invalid manifest format")
+                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_unmodified()
+        self.set_committed()
 
     @synchronized
     def notify(self, event, collection, name, item):
 
     @synchronized
     def notify(self, event, collection, name, item):
@@ -1529,7 +1591,7 @@ class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
 
     """
 
@@ -1567,7 +1629,7 @@ class Subcollection(RichCollectionBase):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._modified = True
+        self._committed = False
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent