10111: Merge branch 'master' into 10111-cr-provenance-graph
[arvados.git] / sdk / python / arvados / collection.py
index 70341d8d68538ac83d31cb30f85d8e94f76e86d5..f26d3a3d27c0b221d269d3d4a1beb8775268f7e6 100644 (file)
@@ -307,7 +307,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -474,6 +475,7 @@ class ResumableCollectionWriter(CollectionWriter):
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+TOK = "tok"
 FILE = "file"
 COLLECTION = "collection"
 
 FILE = "file"
 COLLECTION = "collection"
 
@@ -542,7 +544,7 @@ class RichCollectionBase(CollectionBase):
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
@@ -550,12 +552,12 @@ class RichCollectionBase(CollectionBase):
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -563,16 +565,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -581,7 +590,7 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
                 else:
                     return item
             else:
-                raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
     @synchronized
     def mkdirs(self, path):
 
     @synchronized
     def mkdirs(self, path):
@@ -593,7 +602,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if self.find(path) != None:
         """
 
         if self.find(path) != None:
-            raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
 
         return self.find_or_create(path, COLLECTION)
 
 
         return self.find_or_create(path, COLLECTION)
 
@@ -629,9 +638,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError(errno.EISDIR, "Is a directory: %s" % path)
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
         if mode[0] == "w":
             arvfile.truncate(0)
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -650,20 +659,26 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def committed(self):
         """Determine if the collection has been committed to the API server."""
     @synchronized
     def committed(self):
         """Determine if the collection has been committed to the API server."""
-
-        if self._committed is False:
-            return False
-        for v in self._items.values():
-            if v.committed() is False:
-                return False
-        return True
+        return self._committed
 
     @synchronized
 
     @synchronized
-    def set_committed(self):
-        """Recursively set committed flag to True."""
-        self._committed = True
-        for k,v in self._items.items():
-            v.set_committed()
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in self._items.items():
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
     @synchronized
     def __iter__(self):
 
     @synchronized
     def __iter__(self):
@@ -694,7 +709,7 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._committed = False
+        self.set_committed(False)
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -731,13 +746,13 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._committed = False
+            self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
@@ -772,7 +787,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if target_name in self and not overwrite:
         """
 
         if target_name in self and not overwrite:
-            raise IOError(errno.EEXIST, "File already exists")
+            raise IOError(errno.EEXIST, "File already exists", target_name)
 
         modified_from = None
         if target_name in self:
 
         modified_from = None
         if target_name in self:
@@ -786,7 +801,7 @@ class RichCollectionBase(CollectionBase):
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
-        self._committed = False
+        self.set_committed(False)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -801,7 +816,7 @@ class RichCollectionBase(CollectionBase):
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
-                raise IOError(errno.ENOENT, "File not found")
+                raise IOError(errno.ENOENT, "File not found", source)
             sourcecomponents = source.split("/")
         else:
             source_obj = source
             sourcecomponents = source.split("/")
         else:
             source_obj = source
@@ -825,9 +840,9 @@ class RichCollectionBase(CollectionBase):
                 target_dir = self
 
         if target_dir is None:
                 target_dir = self
 
         if target_dir is None:
-            raise IOError(errno.ENOENT, "Target directory not found.")
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
@@ -880,7 +895,7 @@ class RichCollectionBase(CollectionBase):
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
-            raise IOError(errno.EROFS, "Source collection is read only.")
+            raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
@@ -896,7 +911,8 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
         return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
-    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+    def manifest_text(self, stream_name=".", strip=False, normalize=False,
+                      only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
@@ -915,13 +931,18 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, don't commit pending blocks.
+
         """
 
         """
 
-        self._my_block_manager().commit_all()
-        return self._get_manifest_text(stream_name, strip, normalize)
+        if not only_committed:
+            self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize,
+                                       only_committed=only_committed)
 
     @synchronized
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -937,6 +958,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -950,6 +974,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -959,7 +985,7 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -986,6 +1012,8 @@ class RichCollectionBase(CollectionBase):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
             else:
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
@@ -1000,7 +1028,7 @@ class RichCollectionBase(CollectionBase):
 
         """
         if changes:
 
         """
         if changes:
-            self._committed = False
+            self.set_committed(False)
         for change in changes:
             event_type = change[0]
             path = change[1]
         for change in changes:
             event_type = change[0]
             path = change[1]
@@ -1016,7 +1044,7 @@ class RichCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif event_type == MOD:
+            elif event_type == MOD or event_type == TOK:
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
@@ -1041,8 +1069,13 @@ class RichCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.portable_manifest_text()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        if self._manifest_locator and self.committed():
+            # If the collection is already saved on the API server, and it's committed
+            # then return API server's PDH response.
+            return self._portable_data_hash
+        else:
+            stripped = self.portable_manifest_text()
+            return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @synchronized
     def subscribe(self, callback):
 
     @synchronized
     def subscribe(self, callback):
@@ -1132,7 +1165,9 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None,
+                 put_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1140,24 +1175,36 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
+        self.put_threads = put_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1167,7 +1214,9 @@ class Collection(RichCollectionBase):
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
+        self._portable_data_hash = None
         self._api_response = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
         self.events = None
 
         self.lock = threading.RLock()
         self.events = None
@@ -1197,6 +1246,10 @@ class Collection(RichCollectionBase):
     def writable(self):
         return True
 
     def writable(self):
         return True
 
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1206,6 +1259,15 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1215,7 +1277,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1230,9 +1293,16 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
         return self._block_manager
 
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1242,10 +1312,15 @@ class Collection(RichCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             self._manifest_text = self._api_response['manifest_text']
+            self._portable_data_hash = self._api_response['portable_data_hash']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -1401,13 +1476,14 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
                 self.update()
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_committed()
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+            self.set_committed(True)
 
         return self._manifest_text
 
 
         return self._manifest_text
 
@@ -1452,21 +1528,23 @@ class Collection(RichCollectionBase):
 
         if create_collection_record:
             if name is None:
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
 
             self._manifest_text = text
 
             self._manifest_text = text
-            self.set_committed()
+            self.set_committed(True)
 
         return text
 
 
         return text
 
@@ -1531,7 +1609,7 @@ class Collection(RichCollectionBase):
                 stream_name = None
                 state = STREAM_NAME
 
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_committed()
+        self.set_committed(True)
 
     @synchronized
     def notify(self, event, collection, name, item):
 
     @synchronized
     def notify(self, event, collection, name, item):
@@ -1543,7 +1621,7 @@ class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
 
     """
 
@@ -1581,7 +1659,7 @@ class Subcollection(RichCollectionBase):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._committed = False
+        self.set_committed(False)
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent